001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.apache.avalon.framework.configuration.Configuration;
024: import org.jacorb.notification.MessageFactory;
025: import org.jacorb.notification.OfferManager;
026: import org.jacorb.notification.SubscriptionManager;
027: import org.jacorb.notification.conf.Attributes;
028: import org.jacorb.notification.conf.Default;
029: import org.jacorb.notification.engine.TaskProcessor;
030: import org.jacorb.notification.interfaces.Message;
031: import org.jacorb.notification.interfaces.MessageSupplier;
032: import org.omg.CORBA.Any;
033: import org.omg.CORBA.BooleanHolder;
034: import org.omg.CORBA.ORB;
035: import org.omg.CosEventChannelAdmin.AlreadyConnected;
036: import org.omg.CosEventComm.Disconnected;
037: import org.omg.CosEventComm.PullSupplier;
038: import org.omg.CosNotifyChannelAdmin.ProxyPullConsumerOperations;
039: import org.omg.CosNotifyChannelAdmin.ProxyPullConsumerPOATie;
040: import org.omg.CosNotifyChannelAdmin.ProxyType;
041: import org.omg.PortableServer.POA;
042: import org.omg.PortableServer.Servant;
043:
044: /**
045: * @jmx.mbean extends = "AbstractProxyConsumerMBean"
046: * @jboss.xmbean
047: *
048: * @author Alphonse Bendt
049: * @version $Id: ProxyPullConsumerImpl.java,v 1.16 2006/01/12 22:34:54 alphonse.bendt Exp $
050: */
051:
052: public class ProxyPullConsumerImpl extends AbstractProxyConsumer
053: implements ProxyPullConsumerOperations, MessageSupplier,
054: MessageSupplierDelegate, ProxyPullConsumerImplMBean {
055: /**
056: * the connected PullSupplier
057: */
058: private PullSupplier pullSupplier_;
059:
060: private long pollInterval_;
061:
062: private final PullMessagesOperation pullMessagesOperation_;
063:
064: private final PullMessagesUtility pollTaskUtility_;
065:
066: // //////////////////////////////////////
067:
068: public ProxyPullConsumerImpl(IAdmin admin, ORB orb, POA poa,
069: Configuration conf, TaskProcessor taskProcessor,
070: MessageFactory messageFactory, OfferManager offerManager,
071: SubscriptionManager subscriptionManager) {
072: super (admin, orb, poa, conf, taskProcessor, messageFactory,
073: null, offerManager, subscriptionManager);
074:
075: pollInterval_ = conf.getAttributeAsLong(
076: Attributes.PULL_CONSUMER_POLL_INTERVAL,
077: Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL);
078:
079: pullMessagesOperation_ = new PullMessagesOperation(this );
080:
081: pollTaskUtility_ = new PullMessagesUtility(taskProcessor, this );
082: }
083:
084: // //////////////////////////////////////
085:
086: public ProxyType MyType() {
087: return ProxyType.PULL_ANY;
088: }
089:
090: public void disconnect_pull_consumer() {
091: destroy();
092: }
093:
094: protected void disconnectClient() {
095: stopTask();
096:
097: pullSupplier_.disconnect_pull_supplier();
098:
099: pullSupplier_ = null;
100: }
101:
102: protected void connectionSuspended() {
103: stopTask();
104: }
105:
106: protected void connectionResumed() {
107: startTask();
108: }
109:
110: public void runPullMessage() throws Disconnected {
111: pullMessagesOperation_.runPull();
112: }
113:
114: public void connect_any_pull_supplier(PullSupplier pullSupplier)
115: throws AlreadyConnected {
116: checkIsNotConnected();
117:
118: pullSupplier_ = pullSupplier;
119:
120: connectClient(pullSupplier);
121:
122: startTask();
123: }
124:
125: private synchronized void startTask() {
126: pollTaskUtility_.startTask(pollInterval_);
127: }
128:
129: private synchronized void stopTask() {
130: pollTaskUtility_.stopTask();
131: }
132:
133: public Servant newServant() {
134: return new ProxyPullConsumerPOATie(this );
135: }
136:
137: // //////////////////////////////////////
138: // todo collect management informations
139:
140: public long getPollInterval() {
141: return pollInterval_;
142: }
143:
144: public long getPullTimer() {
145: return pullMessagesOperation_.getTimeSpentInPull();
146: }
147:
148: public int getPullCounter() {
149: return pullMessagesOperation_.getPullCounter();
150: }
151:
152: public int getSuccessfulPullCounter() {
153: return pullMessagesOperation_.getSuccessfulPullCounter();
154: }
155:
156: public MessageSupplierDelegate.PullResult pullMessages()
157: throws Disconnected {
158: BooleanHolder _hasEvent = new BooleanHolder();
159: Any _event = pullSupplier_.try_pull(_hasEvent);
160:
161: return new MessageSupplierDelegate.PullResult(_event,
162: _hasEvent.value);
163: }
164:
165: public void queueMessages(PullResult data) {
166: Message _message = getMessageFactory().newMessage(
167: (Any) data.data_, this);
168:
169: checkMessageProperties(_message);
170:
171: processMessage(_message);
172: }
173: }
|