001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.apache.avalon.framework.configuration.Configuration;
024: import org.apache.avalon.framework.configuration.ConfigurationException;
025: import org.jacorb.notification.OfferManager;
026: import org.jacorb.notification.SubscriptionManager;
027: import org.jacorb.notification.engine.MessagePushOperation;
028: import org.jacorb.notification.engine.PushTaskExecutorFactory;
029: import org.jacorb.notification.engine.TaskProcessor;
030: import org.jacorb.notification.interfaces.Message;
031: import org.omg.CORBA.ORB;
032: import org.omg.CosEventChannelAdmin.AlreadyConnected;
033: import org.omg.CosEventComm.Disconnected;
034: import org.omg.CosEventComm.PushConsumer;
035: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
036: import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierOperations;
037: import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierPOATie;
038: import org.omg.CosNotifyChannelAdmin.ProxyType;
039: import org.omg.PortableServer.POA;
040: import org.omg.PortableServer.Servant;
041:
042: /**
043: * @jmx.mbean extends = "AbstractProxyPushSupplierMBean"
044: * @jboss.xmbean
045: *
046: * @author Alphonse Bendt
047: * @version $Id: ProxyPushSupplierImpl.java,v 1.24 2006/03/06 19:53:46 alphonse.bendt Exp $
048: */
049:
050: public class ProxyPushSupplierImpl extends AbstractProxyPushSupplier
051: implements ProxyPushSupplierOperations,
052: ProxyPushSupplierImplMBean {
053: private class PushAnyOperation extends MessagePushOperation {
054: public PushAnyOperation(Message message) {
055: super (message);
056: }
057:
058: public void invokePush() throws Disconnected {
059: deliverMessageInternal(message_);
060: }
061: }
062:
063: private PushConsumer pushConsumer_;
064:
065: private long timeSpent_;
066:
067: // //////////////////////////////////////
068:
069: public ProxyPushSupplierImpl(IAdmin admin, ORB orb, POA poa,
070: Configuration conf, TaskProcessor taskProcessor,
071: PushTaskExecutorFactory pushTaskExecutorFactory,
072: OfferManager offerManager,
073: SubscriptionManager subscriptionManager,
074: ConsumerAdmin consumerAdmin) throws ConfigurationException {
075: super (admin, orb, poa, conf, taskProcessor,
076: pushTaskExecutorFactory, offerManager,
077: subscriptionManager, consumerAdmin);
078: }
079:
080: public ProxyType MyType() {
081: return ProxyType.PUSH_ANY;
082: }
083:
084: public void disconnect_push_supplier() {
085: destroy();
086: }
087:
088: protected void disconnectClient() {
089: pushConsumer_.disconnect_push_consumer();
090:
091: pushConsumer_ = null;
092: }
093:
094: private boolean deliverMessageWithRetry(final Message message) {
095: try {
096: deliverMessageInternal(message);
097:
098: return true;
099: } catch (Exception e) {
100: final PushAnyOperation _failedOperation = new PushAnyOperation(
101: message);
102:
103: handleFailedPushOperation(_failedOperation, e);
104:
105: return false;
106: }
107: }
108:
109: private void deliverMessageInternal(final Message message)
110: throws Disconnected {
111: long now = System.currentTimeMillis();
112: pushConsumer_.push(message.toAny());
113: timeSpent_ += (System.currentTimeMillis() - now);
114: resetErrorCounter();
115: }
116:
117: public boolean pushEvent() {
118: final Message _message = getMessageNoBlock();
119:
120: if (_message != null) {
121: try {
122: return deliverMessageWithRetry(_message);
123: } finally {
124: _message.dispose();
125: }
126: }
127:
128: return false;
129: }
130:
131: public void connect_any_push_consumer(PushConsumer pushConsumer)
132: throws AlreadyConnected {
133: checkIsNotConnected();
134:
135: pushConsumer_ = pushConsumer;
136:
137: connectClient(pushConsumer);
138: }
139:
140: protected void connectionResumed() {
141: scheduleFlush();
142: }
143:
144: public Servant newServant() {
145: return new ProxyPushSupplierPOATie(this );
146: }
147:
148: public long getCost() {
149: return timeSpent_;
150: }
151: }
|