Source Code Cross Referenced for AbstractEventChannel.java in  » Collaboration » JacORB » org » jacorb » notification » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Collaboration » JacORB » org.jacorb.notification 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jacorb.notification;
002:
003:        /*
004:         *        JacORB - a free Java ORB
005:         *
006:         *   Copyright (C) 1997-2004 Gerald Brose.
007:         *
008:         *   This library is free software; you can redistribute it and/or
009:         *   modify it under the terms of the GNU Library General Public
010:         *   License as published by the Free Software Foundation; either
011:         *   version 2 of the License, or (at your option) any later version.
012:         *
013:         *   This library is distributed in the hope that it will be useful,
014:         *   but WITHOUT ANY WARRANTY; without even the implied warranty of
015:         *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
016:         *   Library General Public License for more details.
017:         *
018:         *   You should have received a copy of the GNU Library General Public
019:         *   License along with this library; if not, write to the Free
020:         *   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021:         */
022:
023:        import java.lang.ref.WeakReference;
024:        import java.util.HashMap;
025:        import java.util.Iterator;
026:        import java.util.List;
027:        import java.util.Map;
028:
029:        import org.apache.avalon.framework.configuration.Configuration;
030:        import org.apache.avalon.framework.logger.Logger;
031:        import org.jacorb.notification.interfaces.Disposable;
032:        import org.jacorb.notification.interfaces.FilterStage;
033:        import org.jacorb.notification.interfaces.FilterStageSource;
034:        import org.jacorb.notification.interfaces.JMXManageable;
035:        import org.jacorb.notification.interfaces.ProxyEvent;
036:        import org.jacorb.notification.interfaces.ProxyEventAdapter;
037:        import org.jacorb.notification.interfaces.ProxyEventListener;
038:        import org.jacorb.notification.lifecycle.IServantLifecyle;
039:        import org.jacorb.notification.lifecycle.ServantLifecyleControl;
040:        import org.jacorb.notification.servant.AbstractAdmin;
041:        import org.jacorb.notification.servant.AbstractSupplierAdmin;
042:        import org.jacorb.notification.servant.FilterStageListManager;
043:        import org.jacorb.notification.util.AdminPropertySet;
044:        import org.jacorb.notification.util.DisposableManager;
045:        import org.jacorb.notification.util.PropertySet;
046:        import org.jacorb.notification.util.QoSPropertySet;
047:        import org.omg.CORBA.Any;
048:        import org.omg.CORBA.IntHolder;
049:        import org.omg.CORBA.OBJECT_NOT_EXIST;
050:        import org.omg.CORBA.ORB;
051:        import org.omg.CosNotification.EventReliability;
052:        import org.omg.CosNotification.MaxConsumers;
053:        import org.omg.CosNotification.MaxSuppliers;
054:        import org.omg.CosNotification.NamedPropertyRangeSeqHolder;
055:        import org.omg.CosNotification.Property;
056:        import org.omg.CosNotification.UnsupportedAdmin;
057:        import org.omg.CosNotification.UnsupportedQoS;
058:        import org.omg.CosNotifyChannelAdmin.AdminLimit;
059:        import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
060:        import org.omg.CosNotifyChannelAdmin.AdminNotFound;
061:        import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator;
062:        import org.omg.CosNotifyFilter.FilterFactory;
063:        import org.omg.PortableServer.POA;
064:        import org.picocontainer.MutablePicoContainer;
065:
066:        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
067:        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
068:
069:        /**
070:         * @jmx.mbean
071:         * @jboss.xmbean
072:         *
073:         * @author Alphonse Bendt
074:         * @version $Id: AbstractEventChannel.java,v 1.14 2006/05/23 10:22:20 alphonse.bendt Exp $
075:         */
076:
077:        public abstract class AbstractEventChannel implements  IServantLifecyle,
078:                JMXManageable {
079:            /**
080:             * This key is reserved for the default supplier admin and the default consumer admin.
081:             */
082:            private static final Integer DEFAULT_ADMIN_KEY = new Integer(0);
083:
084:            private final DisposableManager disposables_ = new DisposableManager();
085:
086:            protected final Logger logger_;
087:
088:            protected final ORB orb_;
089:
090:            private final POA poa_;
091:
092:            private final Configuration configuration_;
093:
094:            /**
095:             * max number of Suppliers that may be connected at a time to this Channel (0=unlimited)
096:             */
097:            private final AtomicInteger maxNumberOfSuppliers_ = new AtomicInteger(
098:                    0);
099:
100:            /**
101:             * max number of Consumers that may be connected at a time to this Channel (0=unlimited)
102:             */
103:            private final AtomicInteger maxNumberOfConsumers_ = new AtomicInteger(
104:                    0);
105:
106:            private final AdminPropertySet adminSettings_;
107:
108:            private final QoSPropertySet qosSettings_;
109:
110:            private final FilterStageListManager listManager_;
111:
112:            private final FilterFactory defaultFilterFactory_;
113:
114:            /**
115:             * lock variable used to access allConsumerAdmins_ and consumerAdminServants_.
116:             */
117:            private final Object modifyConsumerAdminsLock_ = new Object();
118:
119:            /**
120:             * lock variable used to access allConsumerAdmins_.
121:             */
122:            private final Object modifySupplierAdminsLock_ = new Object();
123:
124:            /**
125:             * maps id's to ConsumerAdminServants (notify style).
126:             */
127:            private final Map consumerAdminServants_ = new HashMap();
128:
129:            /**
130:             * maps id's to SupplierAdminServants (notify style).
131:             */
132:            private final Map supplierAdminServants_ = new HashMap();
133:
134:            /**
135:             * pool of available ID's for Admin Objects. The Pool is used for Consumer and Supplier Admins.
136:             * NOTE: The least available ID is 1 as the ID 0 has a special meaning.
137:             *
138:             * @see #DEFAULT_ADMIN_KEY DEFAULT_ADMIN_KEY.
139:             */
140:            private final AtomicInteger adminIdPool_ = new AtomicInteger(1);
141:
142:            /**
143:             * number of Consumers that are connected to this Channel
144:             */
145:            private final AtomicInteger numberOfConsumers_ = new AtomicInteger(
146:                    0);
147:
148:            /**
149:             * number of Suppliers that are connected to this Channel
150:             */
151:            private final AtomicInteger numberOfSuppliers_ = new AtomicInteger(
152:                    0);
153:
154:            private final ProxyEventListener proxyConsumerEventListener_ = new ProxyEventAdapter() {
155:                public void actionProxyCreationRequest(ProxyEvent event)
156:                        throws AdminLimitExceeded {
157:                    addConsumer();
158:                }
159:
160:                public void actionProxyDisposed(ProxyEvent event) {
161:                    removeConsumer();
162:                }
163:            };
164:
165:            private final ProxyEventListener proxySupplierEventListener_ = new ProxyEventAdapter() {
166:                public void actionProxyCreationRequest(ProxyEvent event)
167:                        throws AdminLimitExceeded {
168:                    addSupplier();
169:                }
170:
171:                public void actionProxyDisposed(ProxyEvent event) {
172:                    removeSupplier();
173:                }
174:            };
175:
176:            protected final MutablePicoContainer container_;
177:
178:            private final int id_;
179:
180:            private final AtomicBoolean destroyed_ = new AtomicBoolean(false);
181:
182:            protected JMXManageable.JMXCallback jmxCallback_;
183:
184:            private final ServantLifecyleControl servantLifecyle_;
185:
186:            ////////////////////////////////////////
187:
188:            public AbstractEventChannel(IFactory factory, ORB orb, POA poa,
189:                    Configuration config, FilterFactory filterFactory) {
190:                super ();
191:
192:                id_ = factory.getChannelID();
193:
194:                orb_ = orb;
195:                poa_ = poa;
196:                configuration_ = config;
197:                defaultFilterFactory_ = filterFactory;
198:                container_ = factory.getContainer();
199:
200:                logger_ = ((org.jacorb.config.Configuration) config)
201:                        .getNamedLogger(getClass().getName());
202:
203:                container_
204:                        .registerComponentImplementation(SubscriptionManager.class);
205:
206:                container_.registerComponentImplementation(OfferManager.class);
207:
208:                adminSettings_ = new AdminPropertySet(configuration_);
209:
210:                qosSettings_ = new QoSPropertySet(configuration_,
211:                        QoSPropertySet.CHANNEL_QOS);
212:
213:                listManager_ = new FilterStageListManager() {
214:                    public void fetchListData(
215:                            FilterStageListManager.FilterStageList list) {
216:                        synchronized (modifyConsumerAdminsLock_) {
217:                            Iterator i = consumerAdminServants_.keySet()
218:                                    .iterator();
219:
220:                            while (i.hasNext()) {
221:                                Integer _key = (Integer) i.next();
222:                                list.add((FilterStage) consumerAdminServants_
223:                                        .get(_key));
224:                            }
225:                        }
226:                    }
227:                };
228:
229:                servantLifecyle_ = new ServantLifecyleControl(this , config);
230:            }
231:
232:            ////////////////////////////////////////
233:
234:            public final void deactivate() {
235:                servantLifecyle_.deactivate();
236:            }
237:
238:            public final org.omg.CORBA.Object activate() {
239:                return servantLifecyle_.activate();
240:            }
241:
242:            /**
243:             * Callback to help keep track of the number of Consumers.
244:             *
245:             * @exception AdminLimitExceeded
246:             *                if creation of another Consumer is prohibited.
247:             */
248:            private void addConsumer() throws AdminLimitExceeded {
249:                final int _maxNumberOfConsumers = maxNumberOfConsumers_.get();
250:                final int _numberOfConsumers = numberOfConsumers_
251:                        .incrementAndGet();
252:
253:                if (_maxNumberOfConsumers == 0) {
254:                    // no limit set
255:                } else if (_numberOfConsumers > _maxNumberOfConsumers) {
256:                    // too many consumers
257:                    numberOfConsumers_.decrementAndGet();
258:                    Any _any = orb_.create_any();
259:                    _any.insert_long(_maxNumberOfConsumers);
260:
261:                    AdminLimit _limit = new AdminLimit("consumer limit", _any);
262:
263:                    throw new AdminLimitExceeded(
264:                            "Consumer creation request exceeds AdminLimit.",
265:                            _limit);
266:                }
267:            }
268:
269:            private void removeConsumer() {
270:                numberOfConsumers_.decrementAndGet();
271:            }
272:
273:            /**
274:             * Callback to keep track of the number of Suppliers
275:             *
276:             * @exception AdminLimitExceeded
277:             *                if creation of another Suppliers is prohibited
278:             */
279:            private void addSupplier() throws AdminLimitExceeded {
280:                final int _numberOfSuppliers = numberOfSuppliers_
281:                        .incrementAndGet();
282:                final int _maxNumberOfSuppliers = maxNumberOfSuppliers_.get();
283:
284:                if (_maxNumberOfSuppliers == 0) {
285:                    // no limit set
286:                } else if (_numberOfSuppliers > _maxNumberOfSuppliers) {
287:                    // too many suppliers
288:                    numberOfSuppliers_.decrementAndGet();
289:
290:                    Any _any = orb_.create_any();
291:                    _any.insert_long(_maxNumberOfSuppliers);
292:
293:                    AdminLimit _limit = new AdminLimit("supplier limit", _any);
294:
295:                    throw new AdminLimitExceeded(
296:                            "supplier creation request exceeds AdminLimit.",
297:                            _limit);
298:                }
299:            }
300:
301:            private void removeSupplier() {
302:                numberOfSuppliers_.decrementAndGet();
303:            }
304:
305:            protected final boolean isDefaultConsumerAdminActive() {
306:                synchronized (modifyConsumerAdminsLock_) {
307:                    return consumerAdminServants_
308:                            .containsKey(DEFAULT_ADMIN_KEY);
309:                }
310:            }
311:
312:            protected final boolean isDefaultSupplierAdminActive() {
313:                synchronized (modifySupplierAdminsLock_) {
314:                    return supplierAdminServants_
315:                            .containsKey(DEFAULT_ADMIN_KEY);
316:                }
317:            }
318:
319:            /**
320:             * The default_filter_factory attribute is a readonly attribute that maintains an object
321:             * reference to the default factory to be used by the EventChannel instance with which it is
322:             * associated for creating filter objects. If the target channel does not support a default
323:             * filter factory, the attribute will maintain the value of OBJECT_NIL.
324:             */
325:            public final FilterFactory default_filter_factory() {
326:                return defaultFilterFactory_;
327:            }
328:
329:            public final int[] get_all_consumeradmins() {
330:                synchronized (modifyConsumerAdminsLock_) {
331:                    final int[] _allConsumerAdminKeys = new int[consumerAdminServants_
332:                            .size()];
333:                    final Iterator i = consumerAdminServants_.keySet()
334:                            .iterator();
335:
336:                    for (int x = 0; i.hasNext(); ++x) {
337:                        _allConsumerAdminKeys[x] = ((Integer) i.next())
338:                                .intValue();
339:                    }
340:                    return _allConsumerAdminKeys;
341:                }
342:            }
343:
344:            public final int[] get_all_supplieradmins() {
345:                synchronized (modifySupplierAdminsLock_) {
346:                    final int[] _allSupplierAdminKeys = new int[supplierAdminServants_
347:                            .size()];
348:                    final Iterator i = supplierAdminServants_.keySet()
349:                            .iterator();
350:
351:                    for (int x = 0; i.hasNext(); ++x) {
352:                        _allSupplierAdminKeys[x] = ((Integer) i.next())
353:                                .intValue();
354:                    }
355:                    return _allSupplierAdminKeys;
356:                }
357:            }
358:
359:            public final Property[] get_admin() {
360:                return adminSettings_.toArray();
361:            }
362:
363:            public final Property[] get_qos() {
364:                return qosSettings_.toArray();
365:            }
366:
367:            public final void set_qos(Property[] props) throws UnsupportedQoS {
368:                if (logger_.isDebugEnabled()) {
369:                    logger_.debug("AbstractEventChannel.set_qos: "
370:                            + qosSettings_);
371:                }
372:
373:                qosSettings_.validate_qos(props,
374:                        new NamedPropertyRangeSeqHolder());
375:
376:                qosSettings_.set_qos(props);
377:            }
378:
379:            public final void validate_qos(Property[] props,
380:                    NamedPropertyRangeSeqHolder namedPropertySeqHolder)
381:                    throws UnsupportedQoS {
382:                qosSettings_.validate_qos(props, namedPropertySeqHolder);
383:            }
384:
385:            public final void set_admin(Property[] adminProps)
386:                    throws UnsupportedAdmin {
387:                adminSettings_.validate_admin(adminProps);
388:
389:                adminSettings_.set_admin(adminProps);
390:
391:                configureAdminLimits(adminSettings_);
392:            }
393:
394:            private void configureAdminLimits(PropertySet adminProperties) {
395:                Any _maxConsumers = adminProperties.get(MaxConsumers.value);
396:                setMaxNumberOfConsumers(_maxConsumers.extract_long());
397:
398:                Any _maxSuppliers = adminProperties.get(MaxSuppliers.value);
399:                setMaxNumberOfSuppliers(_maxSuppliers.extract_long());
400:            }
401:
402:            /**
403:             * destroy this Channel, all created Admins and all Proxies.
404:             *
405:             * @jmx.managed-operation   description = "Destroy this Channel"
406:             *                          impact = "ACTION"
407:             */
408:            public final void destroy() {
409:                if (destroyed_.compareAndSet(false, true)) {
410:                    container_.dispose();
411:
412:                    final List list = container_
413:                            .getComponentInstancesOfType(IContainer.class);
414:
415:                    for (Iterator i = list.iterator(); i.hasNext();) {
416:                        IContainer element = (IContainer) i.next();
417:                        element.destroy();
418:                    }
419:                } else {
420:                    throw new OBJECT_NOT_EXIST();
421:                }
422:            }
423:
424:            public final void dispose() {
425:                if (logger_.isInfoEnabled()) {
426:                    logger_.info("destroy channel " + id_);
427:                }
428:
429:                deactivate();
430:
431:                disposables_.dispose();
432:            }
433:
434:            public final POA getPOA() {
435:                return poa_;
436:            }
437:
438:            public boolean isPersistent() {
439:                return false;
440:            }
441:
442:            /**
443:             * get the number of clients connected to this event channel. the number is the total of all
444:             * Suppliers and Consumers connected to this channel.
445:             */
446:            public final int getNumberOfConnectedClients() {
447:                return numberOfConsumers_.get() + numberOfSuppliers_.get();
448:            }
449:
450:            /**
451:             * @jmx.managed-attribute description = "maximum number of suppliers that are allowed at a time"
452:             *                        access = "read-write"
453:             */
454:            public final int getMaxNumberOfSuppliers() {
455:                return maxNumberOfSuppliers_.get();
456:            }
457:
458:            /**
459:             * @jmx.managed-attribute access = "read-write"
460:             */
461:            public void setMaxNumberOfSuppliers(int max) {
462:                if (max < 0) {
463:                    throw new IllegalArgumentException();
464:                }
465:
466:                maxNumberOfSuppliers_.set(max);
467:
468:                if (logger_.isInfoEnabled()) {
469:                    logger_.info("set MaxNumberOfSuppliers="
470:                            + maxNumberOfSuppliers_);
471:                }
472:            }
473:
474:            /**
475:             * @jmx.managed-attribute description = "maximum number of consumers that are allowed at a time"
476:             *                        access = "read-write"
477:             */
478:            public final int getMaxNumberOfConsumers() {
479:                return maxNumberOfConsumers_.get();
480:            }
481:
482:            /**
483:             * @jmx.managed-attribute access = "read-write"
484:             */
485:            public void setMaxNumberOfConsumers(int max) {
486:                if (max < 0) {
487:                    throw new IllegalArgumentException();
488:                }
489:
490:                maxNumberOfConsumers_.set(max);
491:
492:                if (logger_.isInfoEnabled()) {
493:                    logger_.info("set MaxNumberOfConsumers="
494:                            + maxNumberOfConsumers_);
495:                }
496:            }
497:
498:            private Property[] createQoSPropertiesForAdmin() {
499:                Map _copy = new HashMap(qosSettings_.toMap());
500:
501:                // remove properties that are not relevant for admins
502:                _copy.remove(EventReliability.value);
503:
504:                return PropertySet.map2Props(_copy);
505:            }
506:
507:            protected AbstractAdmin get_consumeradmin_internal(int identifier)
508:                    throws AdminNotFound {
509:                synchronized (modifyConsumerAdminsLock_) {
510:                    Integer _key = new Integer(identifier);
511:
512:                    if (consumerAdminServants_.containsKey(_key)) {
513:                        return (AbstractAdmin) consumerAdminServants_.get(_key);
514:                    }
515:
516:                    throw new AdminNotFound("ID " + identifier
517:                            + " does not exist.");
518:                }
519:            }
520:
521:            protected AbstractAdmin get_supplieradmin_internal(int identifier)
522:                    throws AdminNotFound {
523:                synchronized (modifySupplierAdminsLock_) {
524:                    Integer _key = new Integer(identifier);
525:
526:                    if (supplierAdminServants_.containsKey(_key)) {
527:                        return (AbstractAdmin) supplierAdminServants_.get(_key);
528:                    }
529:
530:                    throw new AdminNotFound("ID " + identifier
531:                            + " does not exist.");
532:                }
533:            }
534:
535:            /**
536:             * fetch the List of all ConsumerAdmins that are connected to this EventChannel.
537:             */
538:            private List getAllConsumerAdmins() {
539:                return listManager_.getList();
540:            }
541:
542:            protected AbstractAdmin getDefaultConsumerAdminServant() {
543:                AbstractAdmin _admin;
544:
545:                synchronized (modifyConsumerAdminsLock_) {
546:                    _admin = (AbstractAdmin) consumerAdminServants_
547:                            .get(DEFAULT_ADMIN_KEY);
548:
549:                    if (_admin == null) {
550:                        _admin = newConsumerAdminServant(DEFAULT_ADMIN_KEY
551:                                .intValue());
552:                        _admin
553:                                .setInterFilterGroupOperator(InterFilterGroupOperator.AND_OP);
554:                        try {
555:                            _admin.set_qos(createQoSPropertiesForAdmin());
556:                        } catch (UnsupportedQoS e) {
557:                            logger_.error("unable to set qos", e);
558:                        }
559:
560:                        addToConsumerAdmins(_admin);
561:                    }
562:                }
563:
564:                return _admin;
565:            }
566:
567:            private void addToConsumerAdmins(AbstractAdmin admin) {
568:                final Integer _key = admin.getID();
569:
570:                admin.registerDisposable(new Disposable() {
571:                    public void dispose() {
572:                        synchronized (modifyConsumerAdminsLock_) {
573:                            consumerAdminServants_.remove(_key);
574:                            listManager_.actionSourceModified();
575:                        }
576:                    }
577:                });
578:
579:                synchronized (modifyConsumerAdminsLock_) {
580:                    consumerAdminServants_.put(_key, admin);
581:
582:                    listManager_.actionSourceModified();
583:                }
584:            }
585:
586:            protected AbstractAdmin new_for_consumers_servant(
587:                    InterFilterGroupOperator filterGroupOperator,
588:                    IntHolder intHolder) {
589:                final AbstractAdmin _admin = newConsumerAdminServant(createAdminID());
590:
591:                intHolder.value = _admin.getID().intValue();
592:
593:                _admin.setInterFilterGroupOperator(filterGroupOperator);
594:
595:                try {
596:                    _admin.set_qos(createQoSPropertiesForAdmin());
597:                } catch (UnsupportedQoS e) {
598:                    logger_.error("unable to set QoS", e);
599:                }
600:
601:                _admin.addProxyEventListener(proxySupplierEventListener_);
602:
603:                addToConsumerAdmins(_admin);
604:
605:                return _admin;
606:            }
607:
608:            private int createAdminID() {
609:                return adminIdPool_.incrementAndGet();
610:            }
611:
612:            private void addToSupplierAdmins(AbstractAdmin admin) {
613:                final Integer _key = admin.getID();
614:
615:                admin.registerDisposable(new Disposable() {
616:                    public void dispose() {
617:                        synchronized (modifySupplierAdminsLock_) {
618:                            supplierAdminServants_.remove(_key);
619:                        }
620:                    }
621:                });
622:
623:                synchronized (modifySupplierAdminsLock_) {
624:                    supplierAdminServants_.put(_key, admin);
625:                }
626:            }
627:
628:            protected AbstractAdmin new_for_suppliers_servant(
629:                    InterFilterGroupOperator filterGroupOperator,
630:                    IntHolder intHolder) {
631:                final AbstractAdmin _admin = newSupplierAdminServant(createAdminID());
632:
633:                intHolder.value = _admin.getID().intValue();
634:
635:                _admin.setInterFilterGroupOperator(filterGroupOperator);
636:
637:                try {
638:                    _admin.set_qos(createQoSPropertiesForAdmin());
639:                } catch (UnsupportedQoS e) {
640:                    logger_.error("unable to set QoS", e);
641:                }
642:
643:                _admin.addProxyEventListener(proxyConsumerEventListener_);
644:
645:                addToSupplierAdmins(_admin);
646:
647:                return _admin;
648:            }
649:
650:            protected AbstractAdmin getDefaultSupplierAdminServant() {
651:                AbstractAdmin _admin;
652:
653:                synchronized (modifySupplierAdminsLock_) {
654:                    _admin = (AbstractAdmin) supplierAdminServants_
655:                            .get(DEFAULT_ADMIN_KEY);
656:
657:                    if (_admin == null) {
658:                        _admin = newSupplierAdminServant(DEFAULT_ADMIN_KEY
659:                                .intValue());
660:                        _admin
661:                                .setInterFilterGroupOperator(InterFilterGroupOperator.AND_OP);
662:                        try {
663:                            _admin.set_qos(createQoSPropertiesForAdmin());
664:                        } catch (UnsupportedQoS e) {
665:                            logger_.error("unable to set qos", e);
666:                        }
667:
668:                        addToSupplierAdmins(_admin);
669:                    }
670:                }
671:
672:                return _admin;
673:            }
674:
675:            ////////////////////////////////////////
676:
677:            private AbstractAdmin newConsumerAdminServant(int id) {
678:                return newConsumerAdmin(id);
679:            }
680:
681:            protected abstract AbstractAdmin newConsumerAdmin(int id);
682:
683:            ////////////////////////////////////////
684:
685:            private static class FilterStageSourceAdapter implements 
686:                    FilterStageSource {
687:                final WeakReference channelRef_;
688:
689:                FilterStageSourceAdapter(AbstractEventChannel channel) {
690:                    channelRef_ = new WeakReference(channel);
691:                }
692:
693:                public List getSubsequentFilterStages() {
694:                    return ((AbstractEventChannel) channelRef_.get())
695:                            .getAllConsumerAdmins();
696:                }
697:            }
698:
699:            private AbstractAdmin newSupplierAdminServant(int id) {
700:                final AbstractSupplierAdmin _admin = newSupplierAdmin(id);
701:
702:                _admin
703:                        .setSubsequentFilterStageSource(new FilterStageSourceAdapter(
704:                                this ));
705:
706:                return _admin;
707:            }
708:
709:            protected abstract AbstractSupplierAdmin newSupplierAdmin(int id);
710:
711:            /**
712:             * @jmx.managed-attribute description="ID that identifies this EventChannel"
713:             *                        access = "read-only"
714:             *                        currencyTimeLimit = "2147483647"
715:             */
716:            public int getID() {
717:                return id_;
718:            }
719:
720:            public final void registerDisposable(Disposable d) {
721:                disposables_.addDisposable(d);
722:            }
723:
724:            public final String getJMXObjectName() {
725:                return "channel=" + getMBeanName();
726:            }
727:
728:            public final String getMBeanName() {
729:                return getMBeanType() + "-" + getID();
730:            }
731:
732:            protected abstract String getMBeanType();
733:
734:            public String[] getJMXNotificationTypes() {
735:                return new String[0];
736:            }
737:
738:            public void setJMXCallback(JMXManageable.JMXCallback callback) {
739:                jmxCallback_ = callback;
740:            }
741:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.