001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.apache.avalon.framework.configuration.Configuration;
024: import org.apache.avalon.framework.configuration.ConfigurationException;
025: import org.jacorb.notification.OfferManager;
026: import org.jacorb.notification.SubscriptionManager;
027: import org.jacorb.notification.engine.TaskProcessor;
028: import org.jacorb.notification.interfaces.Message;
029: import org.omg.CORBA.Any;
030: import org.omg.CORBA.BooleanHolder;
031: import org.omg.CORBA.ORB;
032: import org.omg.CORBA.UNKNOWN;
033: import org.omg.CosEventChannelAdmin.AlreadyConnected;
034: import org.omg.CosEventComm.Disconnected;
035: import org.omg.CosEventComm.PullConsumer;
036: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
037: import org.omg.CosNotifyChannelAdmin.ProxyPullSupplierOperations;
038: import org.omg.CosNotifyChannelAdmin.ProxyPullSupplierPOATie;
039: import org.omg.CosNotifyChannelAdmin.ProxyType;
040: import org.omg.PortableServer.POA;
041: import org.omg.PortableServer.Servant;
042:
043: /**
044: * @author Alphonse Bendt
045: * @version $Id: ProxyPullSupplierImpl.java,v 1.17 2006/07/07 12:22:13 alphonse.bendt Exp $
046: */
047:
048: public class ProxyPullSupplierImpl extends AbstractProxySupplier
049: implements ProxyPullSupplierOperations {
050: private final Any sUndefinedAny;
051:
052: ////////////////////////////////////////
053:
054: private PullConsumer pullConsumer_ = null;
055:
056: ////////////////////////////////////////
057:
058: public ProxyPullSupplierImpl(IAdmin admin, ORB orb, POA poa,
059: Configuration config, TaskProcessor taskProcessor,
060: OfferManager offerManager,
061: SubscriptionManager subscriptionManager,
062: ConsumerAdmin consumerAdmin) throws ConfigurationException {
063: super (admin, orb, poa, config, taskProcessor, offerManager,
064: subscriptionManager, consumerAdmin);
065:
066: sUndefinedAny = orb.create_any();
067: }
068:
069: public ProxyType MyType() {
070: return ProxyType.PULL_ANY;
071: }
072:
073: public void disconnect_pull_supplier() {
074: destroy();
075: }
076:
077: protected void disconnectClient() {
078: if (pullConsumer_ != null) {
079: logger_.info("disconnect any_pull_consumer");
080:
081: pullConsumer_.disconnect_pull_consumer();
082: pullConsumer_ = null;
083: }
084: }
085:
086: public Any pull() throws Disconnected {
087: checkStillConnected();
088:
089: try {
090: Message _event = getMessageBlocking();
091: try {
092: return _event.toAny();
093: } finally {
094: _event.dispose();
095: }
096: } catch (InterruptedException e) {
097: logger_.fatalError("interrupted", e);
098:
099: throw new UNKNOWN();
100: }
101: }
102:
103: public Any try_pull(BooleanHolder hasEvent) throws Disconnected {
104: checkStillConnected();
105:
106: hasEvent.value = false;
107:
108: Message _message = getMessageNoBlock();
109:
110: if (_message != null) {
111: try {
112: hasEvent.value = true;
113:
114: return _message.toAny();
115: } finally {
116: _message.dispose();
117: }
118: }
119:
120: return sUndefinedAny;
121: }
122:
123: public void connect_any_pull_consumer(PullConsumer consumer)
124: throws AlreadyConnected {
125: logger_.info("connect any_pull_consumer");
126:
127: checkIsNotConnected();
128:
129: pullConsumer_ = consumer;
130:
131: connectClient(consumer);
132: }
133:
134: public void enableDelivery() {
135: // as delivery to this PullSupplier causes no remote calls
136: // we can ignore this
137: }
138:
139: public void disableDelivery() {
140: // as delivery to this PullSupplier causes no remote calls
141: // we can ignore this
142: }
143:
144: public void deliverPendingData() {
145: // as we do not actively deliver events we can ignore this
146: }
147:
148: public Servant newServant() {
149: return new ProxyPullSupplierPOATie(this );
150: }
151:
152: protected long getCost() {
153: return 0;
154: }
155: }
|