001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import org.apache.avalon.framework.configuration.Configuration;
025: import org.jacorb.notification.MessageFactory;
026: import org.jacorb.notification.OfferManager;
027: import org.jacorb.notification.SubscriptionManager;
028: import org.jacorb.notification.conf.Attributes;
029: import org.jacorb.notification.conf.Default;
030: import org.jacorb.notification.engine.TaskProcessor;
031: import org.jacorb.notification.interfaces.Message;
032: import org.jacorb.notification.interfaces.MessageSupplier;
033: import org.omg.CORBA.BooleanHolder;
034: import org.omg.CORBA.ORB;
035: import org.omg.CosEventChannelAdmin.AlreadyConnected;
036: import org.omg.CosEventComm.Disconnected;
037: import org.omg.CosNotification.StructuredEvent;
038: import org.omg.CosNotifyChannelAdmin.ProxyType;
039: import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerOperations;
040: import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerPOATie;
041: import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
042: import org.omg.CosNotifyComm.StructuredPullSupplier;
043: import org.omg.PortableServer.POA;
044: import org.omg.PortableServer.Servant;
045:
046: /**
047: * @jmx.mbean extends ="AbstractProxyConsumerMBean"
048: * @jboss.xmbean
049: *
050: * @author Alphonse Bendt
051: * @version $Id: StructuredProxyPullConsumerImpl.java,v 1.17 2006/01/12 22:34:54 alphonse.bendt Exp $
052: */
053:
054: public class StructuredProxyPullConsumerImpl extends
055: AbstractProxyConsumer implements
056: StructuredProxyPullConsumerOperations, MessageSupplier,
057: MessageSupplierDelegate, StructuredProxyPullConsumerImplMBean {
058: private StructuredPullSupplier pullSupplier_;
059:
060: private final long pollInterval_;
061:
062: private final PullMessagesUtility pollUtil_;
063:
064: private final PullMessagesOperation pullMessagesOperation_;
065:
066: // //////////////////////////////////////
067:
068: public StructuredProxyPullConsumerImpl(IAdmin admin, ORB orb,
069: POA poa, Configuration config, TaskProcessor taskProcessor,
070: MessageFactory mf, OfferManager offerManager,
071: SubscriptionManager subscriptionManager,
072: SupplierAdmin supplierAdmin) {
073: super (admin, orb, poa, config, taskProcessor, mf,
074: supplierAdmin, offerManager, subscriptionManager);
075:
076: pollInterval_ = config.getAttributeAsLong(
077: Attributes.PULL_CONSUMER_POLL_INTERVAL,
078: Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL);
079:
080: pollUtil_ = new PullMessagesUtility(taskProcessor, this );
081:
082: pullMessagesOperation_ = new PullMessagesOperation(this );
083: }
084:
085: // //////////////////////////////////////
086:
087: public ProxyType MyType() {
088: return ProxyType.PULL_STRUCTURED;
089: }
090:
091: public void disconnect_structured_pull_consumer() {
092: destroy();
093: }
094:
095: public synchronized void connect_structured_pull_supplier(
096: StructuredPullSupplier pullSupplier)
097: throws AlreadyConnected {
098: checkIsNotConnected();
099: pullSupplier_ = pullSupplier;
100: connectClient(pullSupplier);
101: startTask();
102: }
103:
104: protected void connectionSuspended() {
105: stopTask();
106: }
107:
108: public void connectionResumed() {
109: startTask();
110: }
111:
112: protected void disconnectClient() {
113: stopTask();
114: pullSupplier_.disconnect_structured_pull_supplier();
115:
116: pullSupplier_ = null;
117: }
118:
119: protected void startTask() {
120: pollUtil_.startTask(pollInterval_);
121: }
122:
123: protected void stopTask() {
124: pollUtil_.stopTask();
125: }
126:
127: public Servant newServant() {
128: return new StructuredProxyPullConsumerPOATie(this );
129: }
130:
131: public MessageSupplierDelegate.PullResult pullMessages()
132: throws Disconnected {
133: BooleanHolder _hasEvent = new BooleanHolder();
134: _hasEvent.value = false;
135: StructuredEvent _event = pullSupplier_
136: .try_pull_structured_event(_hasEvent);
137:
138: return new MessageSupplierDelegate.PullResult(_event,
139: _hasEvent.value);
140: }
141:
142: public void queueMessages(PullResult data) {
143: Message _mesg = getMessageFactory().newMessage(
144: (StructuredEvent) data.data_, this );
145:
146: checkMessageProperties(_mesg);
147:
148: processMessage(_mesg);
149: }
150:
151: public void runPullMessage() throws Disconnected {
152: pullMessagesOperation_.runPull();
153: }
154: }
|