001: package org.jacorb.notification;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import java.lang.ref.WeakReference;
024: import java.util.HashMap;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Map;
028:
029: import org.apache.avalon.framework.configuration.Configuration;
030: import org.apache.avalon.framework.logger.Logger;
031: import org.jacorb.notification.interfaces.Disposable;
032: import org.jacorb.notification.interfaces.FilterStage;
033: import org.jacorb.notification.interfaces.FilterStageSource;
034: import org.jacorb.notification.interfaces.JMXManageable;
035: import org.jacorb.notification.interfaces.ProxyEvent;
036: import org.jacorb.notification.interfaces.ProxyEventAdapter;
037: import org.jacorb.notification.interfaces.ProxyEventListener;
038: import org.jacorb.notification.lifecycle.IServantLifecyle;
039: import org.jacorb.notification.lifecycle.ServantLifecyleControl;
040: import org.jacorb.notification.servant.AbstractAdmin;
041: import org.jacorb.notification.servant.AbstractSupplierAdmin;
042: import org.jacorb.notification.servant.FilterStageListManager;
043: import org.jacorb.notification.util.AdminPropertySet;
044: import org.jacorb.notification.util.DisposableManager;
045: import org.jacorb.notification.util.PropertySet;
046: import org.jacorb.notification.util.QoSPropertySet;
047: import org.omg.CORBA.Any;
048: import org.omg.CORBA.IntHolder;
049: import org.omg.CORBA.OBJECT_NOT_EXIST;
050: import org.omg.CORBA.ORB;
051: import org.omg.CosNotification.EventReliability;
052: import org.omg.CosNotification.MaxConsumers;
053: import org.omg.CosNotification.MaxSuppliers;
054: import org.omg.CosNotification.NamedPropertyRangeSeqHolder;
055: import org.omg.CosNotification.Property;
056: import org.omg.CosNotification.UnsupportedAdmin;
057: import org.omg.CosNotification.UnsupportedQoS;
058: import org.omg.CosNotifyChannelAdmin.AdminLimit;
059: import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
060: import org.omg.CosNotifyChannelAdmin.AdminNotFound;
061: import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator;
062: import org.omg.CosNotifyFilter.FilterFactory;
063: import org.omg.PortableServer.POA;
064: import org.picocontainer.MutablePicoContainer;
065:
066: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
067: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
068:
069: /**
070: * @jmx.mbean
071: * @jboss.xmbean
072: *
073: * @author Alphonse Bendt
074: * @version $Id: AbstractEventChannel.java,v 1.14 2006/05/23 10:22:20 alphonse.bendt Exp $
075: */
076:
077: public abstract class AbstractEventChannel implements IServantLifecyle,
078: JMXManageable {
079: /**
080: * This key is reserved for the default supplier admin and the default consumer admin.
081: */
082: private static final Integer DEFAULT_ADMIN_KEY = new Integer(0);
083:
084: private final DisposableManager disposables_ = new DisposableManager();
085:
086: protected final Logger logger_;
087:
088: protected final ORB orb_;
089:
090: private final POA poa_;
091:
092: private final Configuration configuration_;
093:
094: /**
095: * max number of Suppliers that may be connected at a time to this Channel (0=unlimited)
096: */
097: private final AtomicInteger maxNumberOfSuppliers_ = new AtomicInteger(
098: 0);
099:
100: /**
101: * max number of Consumers that may be connected at a time to this Channel (0=unlimited)
102: */
103: private final AtomicInteger maxNumberOfConsumers_ = new AtomicInteger(
104: 0);
105:
106: private final AdminPropertySet adminSettings_;
107:
108: private final QoSPropertySet qosSettings_;
109:
110: private final FilterStageListManager listManager_;
111:
112: private final FilterFactory defaultFilterFactory_;
113:
114: /**
115: * lock variable used to access allConsumerAdmins_ and consumerAdminServants_.
116: */
117: private final Object modifyConsumerAdminsLock_ = new Object();
118:
119: /**
120: * lock variable used to access allConsumerAdmins_.
121: */
122: private final Object modifySupplierAdminsLock_ = new Object();
123:
124: /**
125: * maps id's to ConsumerAdminServants (notify style).
126: */
127: private final Map consumerAdminServants_ = new HashMap();
128:
129: /**
130: * maps id's to SupplierAdminServants (notify style).
131: */
132: private final Map supplierAdminServants_ = new HashMap();
133:
134: /**
135: * pool of available ID's for Admin Objects. The Pool is used for Consumer and Supplier Admins.
136: * NOTE: The least available ID is 1 as the ID 0 has a special meaning.
137: *
138: * @see #DEFAULT_ADMIN_KEY DEFAULT_ADMIN_KEY.
139: */
140: private final AtomicInteger adminIdPool_ = new AtomicInteger(1);
141:
142: /**
143: * number of Consumers that are connected to this Channel
144: */
145: private final AtomicInteger numberOfConsumers_ = new AtomicInteger(
146: 0);
147:
148: /**
149: * number of Suppliers that are connected to this Channel
150: */
151: private final AtomicInteger numberOfSuppliers_ = new AtomicInteger(
152: 0);
153:
154: private final ProxyEventListener proxyConsumerEventListener_ = new ProxyEventAdapter() {
155: public void actionProxyCreationRequest(ProxyEvent event)
156: throws AdminLimitExceeded {
157: addConsumer();
158: }
159:
160: public void actionProxyDisposed(ProxyEvent event) {
161: removeConsumer();
162: }
163: };
164:
165: private final ProxyEventListener proxySupplierEventListener_ = new ProxyEventAdapter() {
166: public void actionProxyCreationRequest(ProxyEvent event)
167: throws AdminLimitExceeded {
168: addSupplier();
169: }
170:
171: public void actionProxyDisposed(ProxyEvent event) {
172: removeSupplier();
173: }
174: };
175:
176: protected final MutablePicoContainer container_;
177:
178: private final int id_;
179:
180: private final AtomicBoolean destroyed_ = new AtomicBoolean(false);
181:
182: protected JMXManageable.JMXCallback jmxCallback_;
183:
184: private final ServantLifecyleControl servantLifecyle_;
185:
186: ////////////////////////////////////////
187:
188: public AbstractEventChannel(IFactory factory, ORB orb, POA poa,
189: Configuration config, FilterFactory filterFactory) {
190: super ();
191:
192: id_ = factory.getChannelID();
193:
194: orb_ = orb;
195: poa_ = poa;
196: configuration_ = config;
197: defaultFilterFactory_ = filterFactory;
198: container_ = factory.getContainer();
199:
200: logger_ = ((org.jacorb.config.Configuration) config)
201: .getNamedLogger(getClass().getName());
202:
203: container_
204: .registerComponentImplementation(SubscriptionManager.class);
205:
206: container_.registerComponentImplementation(OfferManager.class);
207:
208: adminSettings_ = new AdminPropertySet(configuration_);
209:
210: qosSettings_ = new QoSPropertySet(configuration_,
211: QoSPropertySet.CHANNEL_QOS);
212:
213: listManager_ = new FilterStageListManager() {
214: public void fetchListData(
215: FilterStageListManager.FilterStageList list) {
216: synchronized (modifyConsumerAdminsLock_) {
217: Iterator i = consumerAdminServants_.keySet()
218: .iterator();
219:
220: while (i.hasNext()) {
221: Integer _key = (Integer) i.next();
222: list.add((FilterStage) consumerAdminServants_
223: .get(_key));
224: }
225: }
226: }
227: };
228:
229: servantLifecyle_ = new ServantLifecyleControl(this , config);
230: }
231:
232: ////////////////////////////////////////
233:
234: public final void deactivate() {
235: servantLifecyle_.deactivate();
236: }
237:
238: public final org.omg.CORBA.Object activate() {
239: return servantLifecyle_.activate();
240: }
241:
242: /**
243: * Callback to help keep track of the number of Consumers.
244: *
245: * @exception AdminLimitExceeded
246: * if creation of another Consumer is prohibited.
247: */
248: private void addConsumer() throws AdminLimitExceeded {
249: final int _maxNumberOfConsumers = maxNumberOfConsumers_.get();
250: final int _numberOfConsumers = numberOfConsumers_
251: .incrementAndGet();
252:
253: if (_maxNumberOfConsumers == 0) {
254: // no limit set
255: } else if (_numberOfConsumers > _maxNumberOfConsumers) {
256: // too many consumers
257: numberOfConsumers_.decrementAndGet();
258: Any _any = orb_.create_any();
259: _any.insert_long(_maxNumberOfConsumers);
260:
261: AdminLimit _limit = new AdminLimit("consumer limit", _any);
262:
263: throw new AdminLimitExceeded(
264: "Consumer creation request exceeds AdminLimit.",
265: _limit);
266: }
267: }
268:
269: private void removeConsumer() {
270: numberOfConsumers_.decrementAndGet();
271: }
272:
273: /**
274: * Callback to keep track of the number of Suppliers
275: *
276: * @exception AdminLimitExceeded
277: * if creation of another Suppliers is prohibited
278: */
279: private void addSupplier() throws AdminLimitExceeded {
280: final int _numberOfSuppliers = numberOfSuppliers_
281: .incrementAndGet();
282: final int _maxNumberOfSuppliers = maxNumberOfSuppliers_.get();
283:
284: if (_maxNumberOfSuppliers == 0) {
285: // no limit set
286: } else if (_numberOfSuppliers > _maxNumberOfSuppliers) {
287: // too many suppliers
288: numberOfSuppliers_.decrementAndGet();
289:
290: Any _any = orb_.create_any();
291: _any.insert_long(_maxNumberOfSuppliers);
292:
293: AdminLimit _limit = new AdminLimit("supplier limit", _any);
294:
295: throw new AdminLimitExceeded(
296: "supplier creation request exceeds AdminLimit.",
297: _limit);
298: }
299: }
300:
301: private void removeSupplier() {
302: numberOfSuppliers_.decrementAndGet();
303: }
304:
305: protected final boolean isDefaultConsumerAdminActive() {
306: synchronized (modifyConsumerAdminsLock_) {
307: return consumerAdminServants_
308: .containsKey(DEFAULT_ADMIN_KEY);
309: }
310: }
311:
312: protected final boolean isDefaultSupplierAdminActive() {
313: synchronized (modifySupplierAdminsLock_) {
314: return supplierAdminServants_
315: .containsKey(DEFAULT_ADMIN_KEY);
316: }
317: }
318:
319: /**
320: * The default_filter_factory attribute is a readonly attribute that maintains an object
321: * reference to the default factory to be used by the EventChannel instance with which it is
322: * associated for creating filter objects. If the target channel does not support a default
323: * filter factory, the attribute will maintain the value of OBJECT_NIL.
324: */
325: public final FilterFactory default_filter_factory() {
326: return defaultFilterFactory_;
327: }
328:
329: public final int[] get_all_consumeradmins() {
330: synchronized (modifyConsumerAdminsLock_) {
331: final int[] _allConsumerAdminKeys = new int[consumerAdminServants_
332: .size()];
333: final Iterator i = consumerAdminServants_.keySet()
334: .iterator();
335:
336: for (int x = 0; i.hasNext(); ++x) {
337: _allConsumerAdminKeys[x] = ((Integer) i.next())
338: .intValue();
339: }
340: return _allConsumerAdminKeys;
341: }
342: }
343:
344: public final int[] get_all_supplieradmins() {
345: synchronized (modifySupplierAdminsLock_) {
346: final int[] _allSupplierAdminKeys = new int[supplierAdminServants_
347: .size()];
348: final Iterator i = supplierAdminServants_.keySet()
349: .iterator();
350:
351: for (int x = 0; i.hasNext(); ++x) {
352: _allSupplierAdminKeys[x] = ((Integer) i.next())
353: .intValue();
354: }
355: return _allSupplierAdminKeys;
356: }
357: }
358:
359: public final Property[] get_admin() {
360: return adminSettings_.toArray();
361: }
362:
363: public final Property[] get_qos() {
364: return qosSettings_.toArray();
365: }
366:
367: public final void set_qos(Property[] props) throws UnsupportedQoS {
368: if (logger_.isDebugEnabled()) {
369: logger_.debug("AbstractEventChannel.set_qos: "
370: + qosSettings_);
371: }
372:
373: qosSettings_.validate_qos(props,
374: new NamedPropertyRangeSeqHolder());
375:
376: qosSettings_.set_qos(props);
377: }
378:
379: public final void validate_qos(Property[] props,
380: NamedPropertyRangeSeqHolder namedPropertySeqHolder)
381: throws UnsupportedQoS {
382: qosSettings_.validate_qos(props, namedPropertySeqHolder);
383: }
384:
385: public final void set_admin(Property[] adminProps)
386: throws UnsupportedAdmin {
387: adminSettings_.validate_admin(adminProps);
388:
389: adminSettings_.set_admin(adminProps);
390:
391: configureAdminLimits(adminSettings_);
392: }
393:
394: private void configureAdminLimits(PropertySet adminProperties) {
395: Any _maxConsumers = adminProperties.get(MaxConsumers.value);
396: setMaxNumberOfConsumers(_maxConsumers.extract_long());
397:
398: Any _maxSuppliers = adminProperties.get(MaxSuppliers.value);
399: setMaxNumberOfSuppliers(_maxSuppliers.extract_long());
400: }
401:
402: /**
403: * destroy this Channel, all created Admins and all Proxies.
404: *
405: * @jmx.managed-operation description = "Destroy this Channel"
406: * impact = "ACTION"
407: */
408: public final void destroy() {
409: if (destroyed_.compareAndSet(false, true)) {
410: container_.dispose();
411:
412: final List list = container_
413: .getComponentInstancesOfType(IContainer.class);
414:
415: for (Iterator i = list.iterator(); i.hasNext();) {
416: IContainer element = (IContainer) i.next();
417: element.destroy();
418: }
419: } else {
420: throw new OBJECT_NOT_EXIST();
421: }
422: }
423:
424: public final void dispose() {
425: if (logger_.isInfoEnabled()) {
426: logger_.info("destroy channel " + id_);
427: }
428:
429: deactivate();
430:
431: disposables_.dispose();
432: }
433:
434: public final POA getPOA() {
435: return poa_;
436: }
437:
438: public boolean isPersistent() {
439: return false;
440: }
441:
442: /**
443: * get the number of clients connected to this event channel. the number is the total of all
444: * Suppliers and Consumers connected to this channel.
445: */
446: public final int getNumberOfConnectedClients() {
447: return numberOfConsumers_.get() + numberOfSuppliers_.get();
448: }
449:
450: /**
451: * @jmx.managed-attribute description = "maximum number of suppliers that are allowed at a time"
452: * access = "read-write"
453: */
454: public final int getMaxNumberOfSuppliers() {
455: return maxNumberOfSuppliers_.get();
456: }
457:
458: /**
459: * @jmx.managed-attribute access = "read-write"
460: */
461: public void setMaxNumberOfSuppliers(int max) {
462: if (max < 0) {
463: throw new IllegalArgumentException();
464: }
465:
466: maxNumberOfSuppliers_.set(max);
467:
468: if (logger_.isInfoEnabled()) {
469: logger_.info("set MaxNumberOfSuppliers="
470: + maxNumberOfSuppliers_);
471: }
472: }
473:
474: /**
475: * @jmx.managed-attribute description = "maximum number of consumers that are allowed at a time"
476: * access = "read-write"
477: */
478: public final int getMaxNumberOfConsumers() {
479: return maxNumberOfConsumers_.get();
480: }
481:
482: /**
483: * @jmx.managed-attribute access = "read-write"
484: */
485: public void setMaxNumberOfConsumers(int max) {
486: if (max < 0) {
487: throw new IllegalArgumentException();
488: }
489:
490: maxNumberOfConsumers_.set(max);
491:
492: if (logger_.isInfoEnabled()) {
493: logger_.info("set MaxNumberOfConsumers="
494: + maxNumberOfConsumers_);
495: }
496: }
497:
498: private Property[] createQoSPropertiesForAdmin() {
499: Map _copy = new HashMap(qosSettings_.toMap());
500:
501: // remove properties that are not relevant for admins
502: _copy.remove(EventReliability.value);
503:
504: return PropertySet.map2Props(_copy);
505: }
506:
507: protected AbstractAdmin get_consumeradmin_internal(int identifier)
508: throws AdminNotFound {
509: synchronized (modifyConsumerAdminsLock_) {
510: Integer _key = new Integer(identifier);
511:
512: if (consumerAdminServants_.containsKey(_key)) {
513: return (AbstractAdmin) consumerAdminServants_.get(_key);
514: }
515:
516: throw new AdminNotFound("ID " + identifier
517: + " does not exist.");
518: }
519: }
520:
521: protected AbstractAdmin get_supplieradmin_internal(int identifier)
522: throws AdminNotFound {
523: synchronized (modifySupplierAdminsLock_) {
524: Integer _key = new Integer(identifier);
525:
526: if (supplierAdminServants_.containsKey(_key)) {
527: return (AbstractAdmin) supplierAdminServants_.get(_key);
528: }
529:
530: throw new AdminNotFound("ID " + identifier
531: + " does not exist.");
532: }
533: }
534:
535: /**
536: * fetch the List of all ConsumerAdmins that are connected to this EventChannel.
537: */
538: private List getAllConsumerAdmins() {
539: return listManager_.getList();
540: }
541:
542: protected AbstractAdmin getDefaultConsumerAdminServant() {
543: AbstractAdmin _admin;
544:
545: synchronized (modifyConsumerAdminsLock_) {
546: _admin = (AbstractAdmin) consumerAdminServants_
547: .get(DEFAULT_ADMIN_KEY);
548:
549: if (_admin == null) {
550: _admin = newConsumerAdminServant(DEFAULT_ADMIN_KEY
551: .intValue());
552: _admin
553: .setInterFilterGroupOperator(InterFilterGroupOperator.AND_OP);
554: try {
555: _admin.set_qos(createQoSPropertiesForAdmin());
556: } catch (UnsupportedQoS e) {
557: logger_.error("unable to set qos", e);
558: }
559:
560: addToConsumerAdmins(_admin);
561: }
562: }
563:
564: return _admin;
565: }
566:
567: private void addToConsumerAdmins(AbstractAdmin admin) {
568: final Integer _key = admin.getID();
569:
570: admin.registerDisposable(new Disposable() {
571: public void dispose() {
572: synchronized (modifyConsumerAdminsLock_) {
573: consumerAdminServants_.remove(_key);
574: listManager_.actionSourceModified();
575: }
576: }
577: });
578:
579: synchronized (modifyConsumerAdminsLock_) {
580: consumerAdminServants_.put(_key, admin);
581:
582: listManager_.actionSourceModified();
583: }
584: }
585:
586: protected AbstractAdmin new_for_consumers_servant(
587: InterFilterGroupOperator filterGroupOperator,
588: IntHolder intHolder) {
589: final AbstractAdmin _admin = newConsumerAdminServant(createAdminID());
590:
591: intHolder.value = _admin.getID().intValue();
592:
593: _admin.setInterFilterGroupOperator(filterGroupOperator);
594:
595: try {
596: _admin.set_qos(createQoSPropertiesForAdmin());
597: } catch (UnsupportedQoS e) {
598: logger_.error("unable to set QoS", e);
599: }
600:
601: _admin.addProxyEventListener(proxySupplierEventListener_);
602:
603: addToConsumerAdmins(_admin);
604:
605: return _admin;
606: }
607:
608: private int createAdminID() {
609: return adminIdPool_.incrementAndGet();
610: }
611:
612: private void addToSupplierAdmins(AbstractAdmin admin) {
613: final Integer _key = admin.getID();
614:
615: admin.registerDisposable(new Disposable() {
616: public void dispose() {
617: synchronized (modifySupplierAdminsLock_) {
618: supplierAdminServants_.remove(_key);
619: }
620: }
621: });
622:
623: synchronized (modifySupplierAdminsLock_) {
624: supplierAdminServants_.put(_key, admin);
625: }
626: }
627:
628: protected AbstractAdmin new_for_suppliers_servant(
629: InterFilterGroupOperator filterGroupOperator,
630: IntHolder intHolder) {
631: final AbstractAdmin _admin = newSupplierAdminServant(createAdminID());
632:
633: intHolder.value = _admin.getID().intValue();
634:
635: _admin.setInterFilterGroupOperator(filterGroupOperator);
636:
637: try {
638: _admin.set_qos(createQoSPropertiesForAdmin());
639: } catch (UnsupportedQoS e) {
640: logger_.error("unable to set QoS", e);
641: }
642:
643: _admin.addProxyEventListener(proxyConsumerEventListener_);
644:
645: addToSupplierAdmins(_admin);
646:
647: return _admin;
648: }
649:
650: protected AbstractAdmin getDefaultSupplierAdminServant() {
651: AbstractAdmin _admin;
652:
653: synchronized (modifySupplierAdminsLock_) {
654: _admin = (AbstractAdmin) supplierAdminServants_
655: .get(DEFAULT_ADMIN_KEY);
656:
657: if (_admin == null) {
658: _admin = newSupplierAdminServant(DEFAULT_ADMIN_KEY
659: .intValue());
660: _admin
661: .setInterFilterGroupOperator(InterFilterGroupOperator.AND_OP);
662: try {
663: _admin.set_qos(createQoSPropertiesForAdmin());
664: } catch (UnsupportedQoS e) {
665: logger_.error("unable to set qos", e);
666: }
667:
668: addToSupplierAdmins(_admin);
669: }
670: }
671:
672: return _admin;
673: }
674:
675: ////////////////////////////////////////
676:
677: private AbstractAdmin newConsumerAdminServant(int id) {
678: return newConsumerAdmin(id);
679: }
680:
681: protected abstract AbstractAdmin newConsumerAdmin(int id);
682:
683: ////////////////////////////////////////
684:
685: private static class FilterStageSourceAdapter implements
686: FilterStageSource {
687: final WeakReference channelRef_;
688:
689: FilterStageSourceAdapter(AbstractEventChannel channel) {
690: channelRef_ = new WeakReference(channel);
691: }
692:
693: public List getSubsequentFilterStages() {
694: return ((AbstractEventChannel) channelRef_.get())
695: .getAllConsumerAdmins();
696: }
697: }
698:
699: private AbstractAdmin newSupplierAdminServant(int id) {
700: final AbstractSupplierAdmin _admin = newSupplierAdmin(id);
701:
702: _admin
703: .setSubsequentFilterStageSource(new FilterStageSourceAdapter(
704: this ));
705:
706: return _admin;
707: }
708:
709: protected abstract AbstractSupplierAdmin newSupplierAdmin(int id);
710:
711: /**
712: * @jmx.managed-attribute description="ID that identifies this EventChannel"
713: * access = "read-only"
714: * currencyTimeLimit = "2147483647"
715: */
716: public int getID() {
717: return id_;
718: }
719:
720: public final void registerDisposable(Disposable d) {
721: disposables_.addDisposable(d);
722: }
723:
724: public final String getJMXObjectName() {
725: return "channel=" + getMBeanName();
726: }
727:
728: public final String getMBeanName() {
729: return getMBeanType() + "-" + getID();
730: }
731:
732: protected abstract String getMBeanType();
733:
734: public String[] getJMXNotificationTypes() {
735: return new String[0];
736: }
737:
738: public void setJMXCallback(JMXManageable.JMXCallback callback) {
739: jmxCallback_ = callback;
740: }
741: }
|