001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import org.apache.avalon.framework.configuration.Configuration;
025: import org.apache.avalon.framework.configuration.ConfigurationException;
026: import org.jacorb.notification.OfferManager;
027: import org.jacorb.notification.SubscriptionManager;
028: import org.jacorb.notification.engine.PushOperation;
029: import org.jacorb.notification.engine.PushTaskExecutorFactory;
030: import org.jacorb.notification.engine.TaskProcessor;
031: import org.jacorb.notification.interfaces.Message;
032: import org.jacorb.notification.util.PropertySet;
033: import org.jacorb.notification.util.PropertySetAdapter;
034: import org.omg.CORBA.ORB;
035: import org.omg.CosEventChannelAdmin.AlreadyConnected;
036: import org.omg.CosEventChannelAdmin.TypeError;
037: import org.omg.CosEventComm.Disconnected;
038: import org.omg.CosNotification.MaximumBatchSize;
039: import org.omg.CosNotification.PacingInterval;
040: import org.omg.CosNotification.StructuredEvent;
041: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
042: import org.omg.CosNotifyChannelAdmin.ProxyType;
043: import org.omg.CosNotifyChannelAdmin.SequenceProxyPushSupplierOperations;
044: import org.omg.CosNotifyChannelAdmin.SequenceProxyPushSupplierPOATie;
045: import org.omg.CosNotifyComm.SequencePushConsumer;
046: import org.omg.PortableServer.POA;
047: import org.omg.PortableServer.Servant;
048: import org.omg.TimeBase.TimeTHelper;
049:
050: import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
051: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
052: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
053: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
054:
055: /**
056: * @jmx.mbean extends = "AbstractProxyPushSupplierMBean"
057: * @jboss.xmbean
058: *
059: * @author Alphonse Bendt
060: * @version $Id: SequenceProxyPushSupplierImpl.java,v 1.27 2006/03/07 19:23:47 alphonse.bendt Exp $
061: */
062:
063: public class SequenceProxyPushSupplierImpl extends
064: AbstractProxyPushSupplier implements
065: SequenceProxyPushSupplierOperations,
066: SequenceProxyPushSupplierImplMBean {
067: private class PushSequenceOperation implements PushOperation {
068: private final StructuredEvent[] structuredEvents_;
069:
070: public PushSequenceOperation(StructuredEvent[] structuredEvents) {
071: structuredEvents_ = structuredEvents;
072: }
073:
074: public void invokePush() throws Disconnected {
075: deliverPendingMessagesInternal(structuredEvents_);
076: }
077:
078: public void dispose() {
079: // nothing to do
080: }
081: }
082:
083: public SequenceProxyPushSupplierImpl(IAdmin admin, ORB orb,
084: POA poa, Configuration config, TaskProcessor taskProcessor,
085: PushTaskExecutorFactory pushTaskExecutorFactory,
086: OfferManager offerManager,
087: SubscriptionManager subscriptionManager,
088: ConsumerAdmin consumerAdmin) throws ConfigurationException {
089: super (admin, orb, poa, config, taskProcessor,
090: pushTaskExecutorFactory, offerManager,
091: subscriptionManager, consumerAdmin);
092:
093: configureMaxBatchSize();
094:
095: configurePacingInterval();
096:
097: timerTask_ = new Runnable() {
098: public void run() {
099: if (isEnabled()) {
100: scheduleFlush();
101: }
102: }
103: };
104:
105: qosSettings_.addPropertySetListener(MaximumBatchSize.value,
106: new PropertySetAdapter() {
107: public void actionPropertySetChanged(
108: PropertySet source) {
109: configureMaxBatchSize();
110: }
111: });
112:
113: qosSettings_.addPropertySetListener(PacingInterval.value,
114: new PropertySetAdapter() {
115: public void actionPropertySetChanged(
116: PropertySet source) {
117: configurePacingInterval();
118: }
119: });
120: }
121:
122: private final Runnable timerTask_;
123:
124: /**
125: * The connected SequencePushConsumer.
126: */
127: private SequencePushConsumer sequencePushConsumer_;
128:
129: /**
130: * registration for the Scheduled DeliverTask.
131: */
132: private ScheduledFuture taskId_;
133:
134: /**
135: * maximum queue size before a delivery is forced.
136: */
137: private final AtomicInteger maxBatchSize_ = new AtomicInteger(1);
138:
139: /**
140: * how long to wait between two scheduled deliveries.
141: * (0 equals no scheduled deliveries).
142: */
143: private final AtomicLong pacingInterval_ = new AtomicLong(0);
144:
145: private long timeSpent_ = 0;
146:
147: public ProxyType MyType() {
148: return ProxyType.PUSH_SEQUENCE;
149: }
150:
151: public boolean pushEvent() {
152: final Message[] _messages = getAtLeastMessages(maxBatchSize_
153: .get());
154:
155: return pushMessages(_messages);
156: }
157:
158: public void flushPendingEvents() {
159: while (true) {
160: try {
161: final boolean _acquired = pushSync_.tryAcquire(1000,
162: TimeUnit.MILLISECONDS);
163:
164: if (_acquired) {
165: try {
166: final Message[] _messages = getUpToMessages(maxBatchSize_
167: .get());
168:
169: if (_messages == null) {
170: break;
171: }
172:
173: if (_messages.length == 0) {
174: break;
175: }
176:
177: final boolean _success = pushMessages(_messages);
178:
179: if (!_success) {
180: break;
181: }
182: } finally {
183: pushSync_.release();
184: }
185: }
186: } catch (InterruptedException e) {
187: break;
188: }
189: }
190: }
191:
192: private boolean pushMessages(final Message[] messages) {
193: if (messages == null) {
194: return false;
195: }
196:
197: if (messages.length == 0) {
198: return false;
199: }
200:
201: final StructuredEvent[] _structuredEvents = new StructuredEvent[messages.length];
202:
203: for (int x = 0; x < messages.length; ++x) {
204: _structuredEvents[x] = messages[x].toStructuredEvent();
205:
206: messages[x].dispose();
207: }
208:
209: try {
210: deliverPendingMessagesInternal(_structuredEvents);
211:
212: return true;
213: } catch (Exception e) {
214: final PushSequenceOperation _failedOperation = new PushSequenceOperation(
215: _structuredEvents);
216:
217: handleFailedPushOperation(_failedOperation, e);
218:
219: return false;
220: }
221: }
222:
223: private void deliverPendingMessagesInternal(
224: final StructuredEvent[] structuredEvents)
225: throws Disconnected {
226: long now = System.currentTimeMillis();
227: sequencePushConsumer_.push_structured_events(structuredEvents);
228: timeSpent_ += (System.currentTimeMillis() - now);
229: resetErrorCounter();
230: }
231:
232: public void connect_sequence_push_consumer(
233: SequencePushConsumer consumer) throws AlreadyConnected,
234: TypeError {
235: logger_.debug("connect_sequence_push_consumer");
236:
237: checkIsNotConnected();
238:
239: sequencePushConsumer_ = consumer;
240:
241: connectClient(consumer);
242:
243: startTimerTask();
244: }
245:
246: protected void connectionResumed() {
247: scheduleFlush();
248:
249: startTimerTask();
250: }
251:
252: protected void connectionSuspended() {
253: stopTimerTask();
254: }
255:
256: public void disconnect_sequence_push_supplier() {
257: destroy();
258: }
259:
260: protected void disconnectClient() {
261: stopTimerTask();
262:
263: sequencePushConsumer_.disconnect_sequence_push_consumer();
264: sequencePushConsumer_ = null;
265: }
266:
267: private void startTimerTask() {
268: if (pacingInterval_.get() > 0 && taskId_ == null) {
269: final long _interval = timeT2millis();
270: taskId_ = getTaskProcessor().executeTaskPeriodically(
271: _interval, timerTask_, true);
272: }
273: }
274:
275: public long timeT2millis() {
276: final long timeT = pacingInterval_.get();
277: return time2millis(timeT);
278: }
279:
280: public static long time2millis(final long timeT) {
281: return timeT / 10000;
282: }
283:
284: synchronized private void stopTimerTask() {
285: if (taskId_ != null) {
286: taskId_.cancel(true);
287: taskId_ = null;
288: }
289: }
290:
291: private void checkTimerTask() {
292: if (getConnected() && pacingInterval_.get() > 0) {
293: stopTimerTask();
294:
295: startTimerTask();
296: } else {
297: stopTimerTask();
298: }
299: }
300:
301: private boolean configurePacingInterval() {
302: if (qosSettings_.containsKey(PacingInterval.value)) {
303: long _pacingInterval = TimeTHelper.extract(qosSettings_
304: .get(PacingInterval.value));
305:
306: if (pacingInterval_.get() != _pacingInterval) {
307: if (logger_.isInfoEnabled()) {
308: logger_.info("set PacingInterval="
309: + _pacingInterval);
310: }
311: pacingInterval_.set(_pacingInterval);
312:
313: checkTimerTask();
314:
315: return true;
316: }
317: }
318: return false;
319: }
320:
321: private boolean configureMaxBatchSize() {
322: if (qosSettings_.containsKey(MaximumBatchSize.value)) {
323: int _maxBatchSize = qosSettings_
324: .get(MaximumBatchSize.value).extract_long();
325:
326: if (maxBatchSize_.get() != _maxBatchSize) {
327: if (logger_.isInfoEnabled()) {
328: logger_.info("set MaxBatchSize=" + _maxBatchSize);
329: }
330:
331: maxBatchSize_.set(_maxBatchSize);
332:
333: return true;
334: }
335: }
336:
337: return false;
338: }
339:
340: public Servant newServant() {
341: return new SequenceProxyPushSupplierPOATie(this );
342: }
343:
344: protected long getCost() {
345: return timeSpent_;
346: }
347: }
|