001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import java.util.List;
024:
025: import org.apache.avalon.framework.configuration.Configuration;
026: import org.apache.avalon.framework.configuration.ConfigurationException;
027: import org.jacorb.notification.OfferManager;
028: import org.jacorb.notification.SubscriptionManager;
029: import org.jacorb.notification.conf.Attributes;
030: import org.jacorb.notification.conf.Default;
031: import org.jacorb.notification.engine.TaskProcessor;
032: import org.jacorb.notification.interfaces.Message;
033: import org.jacorb.notification.interfaces.MessageConsumer;
034: import org.jacorb.notification.queue.EventQueueFactory;
035: import org.jacorb.notification.queue.MessageQueue;
036: import org.jacorb.notification.queue.MessageQueueAdapter;
037: import org.jacorb.notification.queue.RWLockEventQueueDecorator;
038: import org.jacorb.notification.util.CollectionsWrapper;
039: import org.jacorb.notification.util.PropertySet;
040: import org.jacorb.notification.util.PropertySetAdapter;
041: import org.omg.CORBA.Any;
042: import org.omg.CORBA.NO_IMPLEMENT;
043: import org.omg.CORBA.ORB;
044: import org.omg.CosNotification.DiscardPolicy;
045: import org.omg.CosNotification.EventType;
046: import org.omg.CosNotification.MaxEventsPerConsumer;
047: import org.omg.CosNotification.OrderPolicy;
048: import org.omg.CosNotification.Property;
049: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
050: import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
051: import org.omg.CosNotifyComm.InvalidEventType;
052: import org.omg.CosNotifyComm.NotifyPublish;
053: import org.omg.CosNotifyComm.NotifyPublishHelper;
054: import org.omg.CosNotifyComm.NotifyPublishOperations;
055: import org.omg.CosNotifyComm.NotifySubscribeOperations;
056: import org.omg.PortableServer.POA;
057:
058: /**
059: * Abstract base class for ProxySuppliers. This class provides base functionality
060: * for the different ProxySuppliers:
061: * <ul>
062: * <li>queue management,
063: * <li>error threshold settings.
064: * </ul>
065: *
066: * @jmx.mbean extends = "AbstractProxyMBean"
067: * @jboss.xmbean
068: *
069: * @--jmx.notification name = "notification.proxy.message_discarded"
070: * description = "queue overflow causes messages to be discarded"
071: * notificationType = "java.lang.String"
072: *
073: * @author Alphonse Bendt
074: * @version $Id: AbstractProxySupplier.java,v 1.31 2006/05/23 10:50:35 alphonse.bendt Exp $
075: */
076:
077: public abstract class AbstractProxySupplier extends AbstractProxy
078: implements MessageConsumer, NotifySubscribeOperations,
079: AbstractProxySupplierMBean {
080: private static final String EVENT_MESSAGE_DISCARDED = "notification.proxy.message_discarded";
081:
082: private int numberOfDiscardedMessages_ = 0;
083:
084: private MessageQueue.DiscardListener discardListener_ = new MessageQueue.DiscardListener() {
085: private long sendTimestamp_;
086: private int discardedMessagesSinceLastBroadcast_ = 1;
087:
088: public void messageDiscarded(int maxSize) {
089: numberOfDiscardedMessages_++;
090:
091: // max. one notification every five second
092: if (!((System.currentTimeMillis() - sendTimestamp_) < 5000)) {
093: sendNotification(
094: EVENT_MESSAGE_DISCARDED,
095: discardedMessagesSinceLastBroadcast_
096: + " Message(s) discarded. Queue Limit: "
097: + maxSize);
098: sendTimestamp_ = System.currentTimeMillis();
099: discardedMessagesSinceLastBroadcast_ = 1;
100:
101: if (logger_.isInfoEnabled()) {
102: logger_.info(discardedMessagesSinceLastBroadcast_
103: + " Message(s) discarded. Queue Limit: "
104: + maxSize);
105: }
106: } else {
107: ++discardedMessagesSinceLastBroadcast_;
108: }
109: }
110: };
111:
112: private static final Runnable EMPTY_RUNNABLE = new Runnable() {
113: public void run() {
114: // no operation
115: }
116: };
117:
118: private static final EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
119:
120: private static final Message[] EMPTY_MESSAGE = new Message[0];
121:
122: // //////////////////////////////////////
123:
124: private final RWLockEventQueueDecorator pendingMessages_;
125:
126: private final int errorThreshold_;
127:
128: private final ConsumerAdmin consumerAdmin_;
129:
130: private final EventQueueFactory eventQueueFactory_;
131:
132: private NotifyPublishOperations proxyOfferListener_;
133:
134: private NotifyPublish offerListener_;
135:
136: // //////////////////////////////////////
137:
138: protected AbstractProxySupplier(IAdmin admin, ORB orb, POA poa,
139: Configuration conf, TaskProcessor taskProcessor,
140: OfferManager offerManager,
141: SubscriptionManager subscriptionManager,
142: ConsumerAdmin consumerAdmin) throws ConfigurationException {
143: super (admin, orb, poa, conf, taskProcessor, offerManager,
144: subscriptionManager);
145:
146: consumerAdmin_ = consumerAdmin;
147:
148: eventQueueFactory_ = new EventQueueFactory(conf);
149:
150: errorThreshold_ = conf.getAttributeAsInteger(
151: Attributes.EVENTCONSUMER_ERROR_THRESHOLD,
152: Default.DEFAULT_EVENTCONSUMER_ERROR_THRESHOLD);
153:
154: if (logger_.isInfoEnabled()) {
155: logger_.info("set Error Threshold to : " + errorThreshold_);
156: }
157:
158: qosSettings_.addPropertySetListener(new String[] {
159: OrderPolicy.value, DiscardPolicy.value,
160: MaxEventsPerConsumer.value },
161: eventQueueConfigurationChangedCB);
162:
163: final MessageQueueAdapter initialEventQueue = getMessageQueueFactory()
164: .newMessageQueue(qosSettings_);
165:
166: pendingMessages_ = new RWLockEventQueueDecorator(
167: initialEventQueue);
168:
169: pendingMessages_.addDiscardListener(discardListener_);
170:
171: eventTypes_.add(EVENT_MESSAGE_DISCARDED);
172: }
173:
174: // //////////////////////////////////////
175:
176: protected EventQueueFactory getMessageQueueFactory() {
177: return eventQueueFactory_;
178: }
179:
180: /**
181: * configure pending messages queue. the queue is reconfigured according to the current QoS
182: * Settings. the contents of the queue are reorganized according to the new OrderPolicy.
183: */
184: private final void configureEventQueue() {
185: final MessageQueueAdapter _newQueue = getMessageQueueFactory()
186: .newMessageQueue(qosSettings_);
187:
188: try {
189: pendingMessages_.replaceDelegate(_newQueue);
190: } catch (InterruptedException e) {
191: // ignored
192: }
193: }
194:
195: private PropertySetAdapter eventQueueConfigurationChangedCB = new PropertySetAdapter() {
196: public void actionPropertySetChanged(PropertySet source) {
197: configureEventQueue();
198: }
199: };
200:
201: /**
202: * @jmx.managed-attribute description = "Number of Pending Messages"
203: * access = "read-only"
204: */
205: public int getPendingMessagesCount() {
206: try {
207: return pendingMessages_.getPendingMessagesCount();
208: } catch (InterruptedException e) {
209: return -1;
210: }
211: }
212:
213: /**
214: * @jmx.managed-attribute description = "current OrderPolicy"
215: * access = "read-only"
216: */
217: public final String getOrderPolicy() {
218: return pendingMessages_.getOrderPolicyName();
219: }
220:
221: /**
222: * @jmx.managed-attribute description = "current DiscardPolicy"
223: * access = "read-only"
224: */
225: public final String getDiscardPolicy() {
226: return pendingMessages_.getDiscardPolicyName();
227: }
228:
229: /**
230: * @jmx.managed-attribute description = "maximum number of events that may be queued per consumer"
231: * access = "read-write"
232: */
233: public final int getMaxEventsPerConsumer() {
234: return qosSettings_.get(MaxEventsPerConsumer.value)
235: .extract_long();
236: }
237:
238: /**
239: * @jmx.managed-attribute access = "read-write"
240: */
241: public void setMaxEventsPerConsumer(int max) {
242: final Any any = getORB().create_any();
243: any.insert_long(max);
244: final Property prop = new Property(MaxEventsPerConsumer.value,
245: any);
246: qosSettings_.set_qos(new Property[] { prop });
247: }
248:
249: /**
250: * @jmx.managed-attribute access = "read-only"
251: */
252: public int getNumberOfDiscardedMessages() {
253: return numberOfDiscardedMessages_;
254: }
255:
256: public boolean hasPendingData() {
257: try {
258: return pendingMessages_.hasPendingMessages();
259: } catch (InterruptedException e) {
260: return false;
261: }
262: }
263:
264: /**
265: * put a copy of the Message in the queue of pending Messages.
266: *
267: * @param message
268: * the <code>Message</code> to queue.
269: */
270: protected void enqueue(Message message) {
271: Message _copy = (Message) message.clone();
272:
273: try {
274: pendingMessages_.enqeue(_copy);
275:
276: if (logger_.isDebugEnabled()) {
277: logger_.debug("enqueue " + message
278: + " to pending Messages.");
279: }
280: } catch (InterruptedException e) {
281: _copy.dispose();
282: logger_.info("enqueue was interrupted", e);
283: }
284: }
285:
286: public Message getMessageBlocking() throws InterruptedException {
287: return pendingMessages_.getMessageBlocking();
288: }
289:
290: protected Message getMessageNoBlock() {
291: try {
292: return pendingMessages_.getMessageNoBlock();
293: } catch (InterruptedException e) {
294: Thread.currentThread().interrupt();
295:
296: return null;
297: }
298: }
299:
300: protected Message[] getAllMessages() {
301: try {
302: return pendingMessages_.getAllMessages();
303: } catch (InterruptedException e) {
304: Thread.currentThread().interrupt();
305:
306: return EMPTY_MESSAGE;
307: }
308: }
309:
310: public void queueMessage(final Message message) {
311: if (logger_.isDebugEnabled()) {
312: logger_.debug("queueMessage() connected=" + getConnected()
313: + " suspended=" + isSuspended());
314: }
315:
316: if (getConnected()) {
317: enqueue(message);
318:
319: messageQueued();
320: }
321: }
322:
323: /**
324: * this is an extension point.
325: */
326: protected void messageQueued() {
327: // no operation
328: }
329:
330: /**
331: * @param max maximum number of messages
332: * @return an array containing at most max Messages
333: */
334: protected Message[] getUpToMessages(int max) {
335: try {
336: return pendingMessages_.getUpToMessages(max);
337: } catch (InterruptedException e) {
338: Thread.currentThread().interrupt();
339:
340: return EMPTY_MESSAGE;
341: }
342: }
343:
344: /**
345: * @param min
346: * minimum number of messages
347: * @return an array containing the requested number of Messages or null
348: */
349: protected Message[] getAtLeastMessages(int min) {
350: try {
351: return pendingMessages_.getAtLeastMessages(min);
352: } catch (InterruptedException e) {
353: Thread.currentThread().interrupt();
354:
355: return EMPTY_MESSAGE;
356: }
357: }
358:
359: public int getErrorThreshold() {
360: return errorThreshold_;
361: }
362:
363: public final void dispose() {
364: super .dispose();
365:
366: pendingMessages_.clear();
367:
368: // insert an empty command into the taskProcessor's queue.
369: // otherwise queue seems to contain old entries that prevent GC'ing
370: getTaskProcessor().executeTaskAfterDelay(1000, EMPTY_RUNNABLE);
371: }
372:
373: public final ConsumerAdmin MyAdmin() {
374: return consumerAdmin_;
375: }
376:
377: public final void subscription_change(EventType[] added,
378: EventType[] removed) throws InvalidEventType {
379: subscriptionManager_.subscription_change(added, removed);
380: }
381:
382: public final EventType[] obtain_offered_types(
383: ObtainInfoMode obtainInfoMode) {
384: EventType[] _offeredTypes = EMPTY_EVENT_TYPE_ARRAY;
385:
386: switch (obtainInfoMode.value()) {
387: case ObtainInfoMode._ALL_NOW_UPDATES_ON:
388: registerListener();
389: _offeredTypes = offerManager_.obtain_offered_types();
390: break;
391: case ObtainInfoMode._ALL_NOW_UPDATES_OFF:
392: _offeredTypes = offerManager_.obtain_offered_types();
393: removeListener();
394: break;
395: case ObtainInfoMode._NONE_NOW_UPDATES_ON:
396: registerListener();
397: break;
398: case ObtainInfoMode._NONE_NOW_UPDATES_OFF:
399: removeListener();
400: break;
401: default:
402: throw new IllegalArgumentException("Illegal ObtainInfoMode");
403: }
404:
405: return _offeredTypes;
406: }
407:
408: private void registerListener() {
409: if (proxyOfferListener_ == null) {
410: final NotifyPublishOperations _listener = getOfferListener();
411:
412: if (_listener != null) {
413: proxyOfferListener_ = new NotifyPublishOperations() {
414: public void offer_change(EventType[] added,
415: EventType[] removed) {
416: try {
417: _listener.offer_change(added, removed);
418: } catch (NO_IMPLEMENT e) {
419: logger_
420: .info(
421: "disable offer_change for connected Consumer.",
422: e);
423:
424: removeListener();
425: } catch (InvalidEventType e) {
426: logger_.warn("invalid event type", e);
427: } catch (Exception e) {
428: logger_.warn("offer_change failed", e);
429: }
430: }
431: };
432:
433: offerManager_.addListener(proxyOfferListener_);
434: }
435: }
436: }
437:
438: protected void removeListener() {
439: if (proxyOfferListener_ != null) {
440: offerManager_.removeListener(proxyOfferListener_);
441: proxyOfferListener_ = null;
442: }
443: }
444:
445: final NotifyPublishOperations getOfferListener() {
446: return offerListener_;
447: }
448:
449: protected final void clientDisconnected() {
450: offerListener_ = null;
451: }
452:
453: public void connectClient(org.omg.CORBA.Object client) {
454: super .connectClient(client);
455:
456: try {
457: offerListener_ = NotifyPublishHelper.narrow(client);
458:
459: logger_
460: .debug("successfully narrowed connecting Client to IF NotifyPublish");
461: } catch (Exception t) {
462: logger_
463: .info("disable offer_change for connecting Consumer");
464: }
465: }
466:
467: public boolean isRetryAllowed() {
468: return !isDestroyed()
469: && getErrorCounter() < getErrorThreshold();
470: }
471:
472: protected abstract long getCost();
473:
474: public int compareTo(Object o) {
475: AbstractProxySupplier other = (AbstractProxySupplier) o;
476:
477: return (int) (getCost() - other.getCost());
478: }
479:
480: public final boolean hasMessageConsumer() {
481: return true;
482: }
483:
484: public final List getSubsequentFilterStages() {
485: return CollectionsWrapper.singletonList(this );
486: }
487:
488: public final MessageConsumer getMessageConsumer() {
489: return this ;
490: }
491:
492: /**
493: * @jmx.managed-operation impact = "ACTION"
494: * description = "delete all queued Messages"
495: */
496: public void clearPendingMessageQueue() {
497: pendingMessages_.clear();
498: }
499: }
|