001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import org.apache.avalon.framework.configuration.Configuration;
025: import org.jacorb.notification.MessageFactory;
026: import org.jacorb.notification.OfferManager;
027: import org.jacorb.notification.SubscriptionManager;
028: import org.jacorb.notification.conf.Attributes;
029: import org.jacorb.notification.conf.Default;
030: import org.jacorb.notification.engine.TaskProcessor;
031: import org.jacorb.notification.interfaces.Message;
032: import org.jacorb.notification.interfaces.MessageSupplier;
033: import org.omg.CORBA.BooleanHolder;
034: import org.omg.CORBA.ORB;
035: import org.omg.CosEventChannelAdmin.AlreadyConnected;
036: import org.omg.CosEventComm.Disconnected;
037: import org.omg.CosNotification.StructuredEvent;
038: import org.omg.CosNotifyChannelAdmin.ProxyType;
039: import org.omg.CosNotifyChannelAdmin.SequenceProxyPullConsumerOperations;
040: import org.omg.CosNotifyChannelAdmin.SequenceProxyPullConsumerPOATie;
041: import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
042: import org.omg.CosNotifyComm.SequencePullSupplier;
043: import org.omg.PortableServer.POA;
044: import org.omg.PortableServer.Servant;
045:
046: /**
047: * @jmx.mbean extends = "AbstractProxyConsumerMBean"
048: * @jboss.xmbean
049: *
050: * @author Alphonse Bendt
051: * @version $Id: SequenceProxyPullConsumerImpl.java,v 1.15 2006/01/12 22:34:54 alphonse.bendt Exp $
052: */
053:
054: public class SequenceProxyPullConsumerImpl extends
055: AbstractProxyConsumer implements
056: SequenceProxyPullConsumerOperations,
057: SequenceProxyPullConsumerImplMBean, MessageSupplier,
058: MessageSupplierDelegate {
059: private SequencePullSupplier sequencePullSupplier_;
060:
061: private final PullMessagesUtility pollTaskUtility_;
062:
063: private final long pollInterval_;
064:
065: private final PullMessagesOperation pullMessagesOperation_;
066:
067: // //////////////////////////////////////
068:
069: public SequenceProxyPullConsumerImpl(IAdmin admin, ORB orb,
070: POA poa, Configuration config, TaskProcessor taskProcessor,
071: MessageFactory messageFactory, OfferManager offerManager,
072: SubscriptionManager subscriptionManager,
073: SupplierAdmin supplierAdmin) {
074: super (admin, orb, poa, config, taskProcessor, messageFactory,
075: supplierAdmin, offerManager, subscriptionManager);
076:
077: pollInterval_ = config.getAttributeAsLong(
078: Attributes.PULL_CONSUMER_POLL_INTERVAL,
079: Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL);
080:
081: pollTaskUtility_ = new PullMessagesUtility(taskProcessor, this );
082:
083: pullMessagesOperation_ = new PullMessagesOperation(this );
084: }
085:
086: public ProxyType MyType() {
087: return ProxyType.PULL_SEQUENCE;
088: }
089:
090: public void disconnect_sequence_pull_consumer() {
091: destroy();
092: }
093:
094: public synchronized void connect_sequence_pull_supplier(
095: SequencePullSupplier sequencePullSupplier)
096: throws AlreadyConnected {
097: checkIsNotConnected();
098:
099: sequencePullSupplier_ = sequencePullSupplier;
100:
101: connectClient(sequencePullSupplier);
102:
103: pollTaskUtility_.startTask(pollInterval_);
104: }
105:
106: protected void disconnectClient() {
107: pollTaskUtility_.stopTask();
108: sequencePullSupplier_.disconnect_sequence_pull_supplier();
109: sequencePullSupplier_ = null;
110: }
111:
112: public Servant newServant() {
113: return new SequenceProxyPullConsumerPOATie(this );
114: }
115:
116: public MessageSupplierDelegate.PullResult pullMessages()
117: throws Disconnected {
118: BooleanHolder _hasEvent = new BooleanHolder();
119: _hasEvent.value = false;
120: StructuredEvent[] _events = sequencePullSupplier_
121: .try_pull_structured_events(1, _hasEvent);
122:
123: return new MessageSupplierDelegate.PullResult(_events,
124: _hasEvent.value);
125: }
126:
127: public void queueMessages(PullResult pullResult) {
128: StructuredEvent[] _events = (StructuredEvent[]) pullResult.data_;
129: Message[] _messages = newMessages(_events);
130:
131: for (int x = 0; x < _messages.length; ++x) {
132: processMessage(_messages[x]);
133: }
134: }
135:
136: public void runPullMessage() throws Disconnected {
137: pullMessagesOperation_.runPull();
138: }
139: }
|