001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2003 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import java.util.Collections;
025: import java.util.Comparator;
026: import java.util.Iterator;
027: import java.util.List;
028: import java.util.Map;
029:
030: import org.apache.avalon.framework.configuration.Configuration;
031: import org.jacorb.notification.MessageFactory;
032: import org.jacorb.notification.OfferManager;
033: import org.jacorb.notification.SubscriptionManager;
034: import org.jacorb.notification.container.CORBAObjectComponentAdapter;
035: import org.jacorb.notification.interfaces.Disposable;
036: import org.jacorb.notification.interfaces.FilterStage;
037: import org.jacorb.notification.interfaces.MessageConsumer;
038: import org.jacorb.notification.interfaces.ProxyEvent;
039: import org.jacorb.notification.interfaces.ProxyEventListener;
040: import org.omg.CORBA.BAD_PARAM;
041: import org.omg.CORBA.IntHolder;
042: import org.omg.CORBA.ORB;
043: import org.omg.CORBA.UNKNOWN;
044: import org.omg.CosEventChannelAdmin.ProxyPullSupplier;
045: import org.omg.CosEventChannelAdmin.ProxyPushSupplier;
046: import org.omg.CosNotification.EventType;
047: import org.omg.CosNotification.UnsupportedQoS;
048: import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
049: import org.omg.CosNotifyChannelAdmin.ClientType;
050: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
051: import org.omg.CosNotifyChannelAdmin.ConsumerAdminHelper;
052: import org.omg.CosNotifyChannelAdmin.ConsumerAdminOperations;
053: import org.omg.CosNotifyChannelAdmin.ConsumerAdminPOATie;
054: import org.omg.CosNotifyChannelAdmin.ProxyNotFound;
055: import org.omg.CosNotifyChannelAdmin.ProxySupplier;
056: import org.omg.CosNotifyChannelAdmin.ProxySupplierHelper;
057: import org.omg.CosNotifyComm.InvalidEventType;
058: import org.omg.CosNotifyFilter.MappingFilter;
059: import org.omg.CosNotifyFilter.MappingFilterHelper;
060: import org.omg.PortableServer.POA;
061: import org.omg.PortableServer.Servant;
062: import org.picocontainer.MutablePicoContainer;
063: import org.picocontainer.defaults.CachingComponentAdapter;
064:
065: /**
066: * @jmx.mbean extends = "AbstractAdminMBean"
067: * @jboss.xmbean
068: *
069: * @author Alphonse Bendt
070: * @version $Id: ConsumerAdminImpl.java,v 1.11 2006/05/17 13:09:39 alphonse.bendt Exp $
071: */
072:
073: public class ConsumerAdminImpl extends AbstractAdmin implements
074: ConsumerAdminOperations, Disposable, ProxyEventListener,
075: ConsumerAdminImplMBean {
076: private final static class FilterstageWithMessageConsumerComparator
077: implements Comparator {
078: /**
079: * compare two FilterStages via their MessageConsumer.
080: */
081: public int compare(Object l, Object r) {
082: FilterStage left = (FilterStage) l;
083: FilterStage right = (FilterStage) r;
084:
085: return left.getMessageConsumer().compareTo(
086: right.getMessageConsumer());
087: }
088: }
089:
090: private static final FilterstageWithMessageConsumerComparator FILTERSTAGE_COMPARATOR = new FilterstageWithMessageConsumerComparator();
091:
092: private final FilterStageListManager listManager_;
093:
094: private MappingFilter priorityFilter_;
095:
096: private MappingFilter lifetimeFilter_;
097:
098: ////////////////////////////////////////
099:
100: public ConsumerAdminImpl(IEventChannel channelServant, ORB orb,
101: POA poa, Configuration config,
102: MessageFactory messageFactory, OfferManager offerManager,
103: SubscriptionManager subscriptionManager) {
104: super (channelServant, orb, poa, config, messageFactory,
105: offerManager, subscriptionManager);
106:
107: // register core components (factories)
108:
109: listManager_ = new FilterStageListManager() {
110: protected void fetchListData(
111: FilterStageListManager.FilterStageList listProxy) {
112: addAllValues(listProxy, pullServants_);
113:
114: addAllValues(listProxy, pushServants_);
115: }
116:
117: protected void doSortCheckedList(List list) {
118: Collections.sort(list, FILTERSTAGE_COMPARATOR);
119: }
120:
121: private void addAllValues(
122: FilterStageListManager.FilterStageList listProxy,
123: Map map) {
124: for (Iterator i = map.entrySet().iterator(); i
125: .hasNext();) {
126: listProxy.add((FilterStage) ((Map.Entry) i.next())
127: .getValue());
128: }
129: }
130: };
131:
132: lifetimeFilter_ = MappingFilterHelper.unchecked_narrow(getORB()
133: .string_to_object(getORB().object_to_string(null)));
134:
135: priorityFilter_ = MappingFilterHelper.unchecked_narrow(getORB()
136: .string_to_object(getORB().object_to_string(null)));
137:
138: addProxyEventListener(this );
139:
140: ConsumerAdmin _this Ref = ConsumerAdminHelper.narrow(activate());
141:
142: container_.registerComponent(new CachingComponentAdapter(
143: new CORBAObjectComponentAdapter(ConsumerAdmin.class,
144: _this Ref)));
145:
146: registerDisposable(new Disposable() {
147: public void dispose() {
148: container_.unregisterComponent(ConsumerAdmin.class);
149: }
150: });
151: }
152:
153: ////////////////////////////////////////
154:
155: public Servant newServant() {
156: return new ConsumerAdminPOATie(this );
157: }
158:
159: public void subscription_change(EventType[] added,
160: EventType[] removed) throws InvalidEventType {
161: subscriptionManager_.subscription_change(added, removed);
162: }
163:
164: public ProxySupplier get_proxy_supplier(int key)
165: throws ProxyNotFound {
166: return ProxySupplierHelper.narrow(getProxy(key).activate());
167: }
168:
169: public void lifetime_filter(MappingFilter lifetimeFilter) {
170: lifetimeFilter_ = lifetimeFilter;
171: }
172:
173: public MappingFilter lifetime_filter() {
174: return lifetimeFilter_;
175: }
176:
177: public MappingFilter priority_filter() {
178: return priorityFilter_;
179: }
180:
181: public void priority_filter(MappingFilter priorityFilter) {
182: priorityFilter_ = priorityFilter;
183: }
184:
185: public ProxySupplier obtain_notification_pull_supplier(
186: ClientType clientType, IntHolder intHolder)
187: throws AdminLimitExceeded {
188: // may throw AdminLimitExceeded
189: fireCreateProxyRequestEvent();
190:
191: try {
192: final AbstractProxy _servant = obtain_notification_pull_supplier_servant(clientType);
193:
194: intHolder.value = _servant.getID().intValue();
195:
196: return ProxySupplierHelper.narrow(_servant.activate());
197: } catch (Exception e) {
198: logger_
199: .fatalError(
200: "obtain_notification_pull_supplier: unexpected error",
201: e);
202:
203: throw new UNKNOWN(e.toString());
204: }
205: }
206:
207: protected void configureMappingFilters(AbstractProxySupplier servant) {
208: if (lifetimeFilter_ != null) {
209: servant.lifetime_filter(lifetimeFilter_);
210: }
211:
212: if (priorityFilter_ != null) {
213: servant.priority_filter(priorityFilter_);
214: }
215: }
216:
217: private AbstractProxy obtain_notification_pull_supplier_servant(
218: ClientType clientType) throws UnsupportedQoS {
219: final AbstractProxySupplier _servant = newProxyPullSupplier(clientType);
220:
221: configureMappingFilters(_servant);
222:
223: configureQoS(_servant);
224:
225: configureInterFilterGroupOperator(_servant);
226:
227: addProxyToMap(_servant, pullServants_, modifyProxiesLock_);
228:
229: return _servant;
230: }
231:
232: public int[] pull_suppliers() {
233: return get_all_notify_proxies(pullServants_, modifyProxiesLock_);
234: }
235:
236: public int[] push_suppliers() {
237: return get_all_notify_proxies(pushServants_, modifyProxiesLock_);
238: }
239:
240: public ProxySupplier obtain_notification_push_supplier(
241: ClientType clientType, IntHolder intHolder)
242: throws AdminLimitExceeded {
243: // may throw AdminLimitExceeded
244: fireCreateProxyRequestEvent();
245:
246: try {
247: final AbstractProxy _servant = obtain_notification_push_supplier_servant(clientType);
248:
249: intHolder.value = _servant.getID().intValue();
250:
251: return ProxySupplierHelper.narrow(_servant.activate());
252: } catch (Exception e) {
253: logger_
254: .fatalError(
255: "obtain_notification_push_supplier: unexpected error",
256: e);
257:
258: throw new UNKNOWN();
259: }
260: }
261:
262: private AbstractProxy obtain_notification_push_supplier_servant(
263: ClientType clientType) throws UnsupportedQoS {
264: final AbstractProxySupplier _servant = newProxyPushSupplier(clientType);
265:
266: configureMappingFilters(_servant);
267:
268: configureQoS(_servant);
269:
270: configureInterFilterGroupOperator(_servant);
271:
272: addProxyToMap(_servant, pushServants_, modifyProxiesLock_);
273:
274: return _servant;
275: }
276:
277: public ProxyPullSupplier obtain_pull_supplier() {
278: try {
279: MutablePicoContainer _container = newContainerForEventStyleProxy();
280:
281: _container.registerComponentImplementation(
282: AbstractProxy.class, ECProxyPullSupplierImpl.class);
283:
284: AbstractProxy _servant = (AbstractProxy) _container
285: .getComponentInstanceOfType(AbstractProxy.class);
286:
287: configureQoS(_servant);
288:
289: addProxyToMap(_servant, pullServants_, modifyProxiesLock_);
290:
291: return org.omg.CosEventChannelAdmin.ProxyPullSupplierHelper
292: .narrow(_servant.activate());
293: } catch (Exception e) {
294: logger_.fatalError("obtain_pull_supplier: exception", e);
295:
296: throw new UNKNOWN();
297: }
298: }
299:
300: /**
301: * get ProxyPushSupplier (EventStyle)
302: */
303: public ProxyPushSupplier obtain_push_supplier() {
304: try {
305: MutablePicoContainer _container = newContainerForEventStyleProxy();
306:
307: _container.registerComponentImplementation(
308: AbstractProxy.class, ECProxyPushSupplierImpl.class);
309:
310: final AbstractProxy _servant = (AbstractProxy) _container
311: .getComponentInstanceOfType(AbstractProxy.class);
312:
313: configureQoS(_servant);
314:
315: addProxyToMap(_servant, pushServants_, modifyProxiesLock_);
316:
317: return org.omg.CosEventChannelAdmin.ProxyPushSupplierHelper
318: .narrow(_servant.activate());
319: } catch (Exception e) {
320: logger_.fatalError("obtain_push_supplier: exception", e);
321:
322: throw new UNKNOWN(e.toString());
323: }
324: }
325:
326: public List getSubsequentFilterStages() {
327: return listManager_.getList();
328: }
329:
330: /**
331: * ConsumerAdmin never has a MessageConsumer
332: */
333: public MessageConsumer getMessageConsumer() {
334: return null;
335: }
336:
337: /**
338: * ConsumerAdmin never has a MessageConsumer
339: */
340: public boolean hasMessageConsumer() {
341: return false;
342: }
343:
344: public void actionProxyCreationRequest(ProxyEvent event) {
345: // ignored
346: }
347:
348: public void actionProxyDisposed(ProxyEvent event) {
349: listManager_.actionSourceModified();
350: listManager_.refresh();
351: }
352:
353: public void actionProxyCreated(ProxyEvent event) {
354: listManager_.actionSourceModified();
355: }
356:
357: /**
358: * factory method for new ProxyPullSuppliers.
359: */
360: AbstractProxySupplier newProxyPullSupplier(ClientType clientType) {
361: final MutablePicoContainer _containerForProxy = newContainerForNotifyStyleProxy();
362: final Class _proxyClass;
363:
364: switch (clientType.value()) {
365: case ClientType._ANY_EVENT:
366: _proxyClass = ProxyPullSupplierImpl.class;
367:
368: break;
369: case ClientType._STRUCTURED_EVENT:
370: _proxyClass = StructuredProxyPullSupplierImpl.class;
371:
372: break;
373: case ClientType._SEQUENCE_EVENT:
374: _proxyClass = SequenceProxyPullSupplierImpl.class;
375:
376: break;
377: default:
378: throw new BAD_PARAM();
379: }
380:
381: _containerForProxy.registerComponentImplementation(
382: AbstractProxySupplier.class, _proxyClass);
383:
384: final AbstractProxySupplier _servant = (AbstractProxySupplier) _containerForProxy
385: .getComponentInstanceOfType(AbstractProxySupplier.class);
386:
387: return _servant;
388: }
389:
390: /**
391: * factory method for new ProxyPushSuppliers.
392: */
393: AbstractProxySupplier newProxyPushSupplier(ClientType clientType) {
394: final Class _proxyClass;
395:
396: switch (clientType.value()) {
397:
398: case ClientType._ANY_EVENT:
399: _proxyClass = ProxyPushSupplierImpl.class;
400: break;
401:
402: case ClientType._STRUCTURED_EVENT:
403: _proxyClass = StructuredProxyPushSupplierImpl.class;
404: break;
405:
406: case ClientType._SEQUENCE_EVENT:
407: _proxyClass = SequenceProxyPushSupplierImpl.class;
408: break;
409:
410: default:
411: throw new BAD_PARAM("The ClientType: " + clientType.value()
412: + " is unknown");
413: }
414:
415: final MutablePicoContainer _containerForProxy = newContainerForNotifyStyleProxy();
416:
417: _containerForProxy.registerComponentImplementation(
418: AbstractProxySupplier.class, _proxyClass);
419:
420: final AbstractProxySupplier _servant = (AbstractProxySupplier) _containerForProxy
421: .getComponentInstanceOfType(AbstractProxySupplier.class);
422:
423: return _servant;
424: }
425:
426: public String getMBeanType() {
427: return "ConsumerAdmin";
428: }
429: }
|