001: package org.jacorb.notification;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2003 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import java.io.FileWriter;
024: import java.io.IOException;
025: import java.io.PrintWriter;
026: import java.io.Writer;
027: import java.util.ArrayList;
028: import java.util.Iterator;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.Properties;
032:
033: import org.apache.avalon.framework.configuration.Configuration;
034: import org.apache.avalon.framework.configuration.ConfigurationException;
035: import org.apache.avalon.framework.logger.Logger;
036: import org.jacorb.notification.conf.Attributes;
037: import org.jacorb.notification.container.BiDirGiopPOAComponentAdapter;
038: import org.jacorb.notification.container.PicoContainerFactory;
039: import org.jacorb.notification.interfaces.Disposable;
040: import org.jacorb.notification.lifecycle.ManageableServant;
041: import org.jacorb.notification.util.AdminPropertySet;
042: import org.jacorb.notification.util.DisposableManager;
043: import org.jacorb.notification.util.PropertySet;
044: import org.jacorb.notification.util.QoSPropertySet;
045: import org.omg.CORBA.Any;
046: import org.omg.CORBA.IntHolder;
047: import org.omg.CORBA.ORB;
048: import org.omg.CORBA.UserException;
049: import org.omg.CosNaming.NameComponent;
050: import org.omg.CosNaming.NamingContext;
051: import org.omg.CosNaming.NamingContextHelper;
052: import org.omg.CosNaming.NamingContextPackage.CannotProceed;
053: import org.omg.CosNaming.NamingContextPackage.InvalidName;
054: import org.omg.CosNaming.NamingContextPackage.NotFound;
055: import org.omg.CosNotification.BestEffort;
056: import org.omg.CosNotification.ConnectionReliability;
057: import org.omg.CosNotification.EventReliability;
058: import org.omg.CosNotification.Persistent;
059: import org.omg.CosNotification.Property;
060: import org.omg.CosNotification.PropertyError;
061: import org.omg.CosNotification.PropertyRange;
062: import org.omg.CosNotification.QoSError_code;
063: import org.omg.CosNotification.UnsupportedAdmin;
064: import org.omg.CosNotification.UnsupportedQoS;
065: import org.omg.CosNotifyChannelAdmin.ChannelNotFound;
066: import org.omg.PortableServer.IdAssignmentPolicyValue;
067: import org.omg.PortableServer.POA;
068: import org.omg.PortableServer.POAHelper;
069: import org.omg.PortableServer.Servant;
070: import org.picocontainer.MutablePicoContainer;
071:
072: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
073:
074: /**
075: * @author Alphonse Bendt
076: * @version $Id: AbstractChannelFactory.java,v 1.22 2006/06/22 09:51:04 alphonse.bendt Exp $
077: */
078:
079: public abstract class AbstractChannelFactory implements
080: ManageableServant, Disposable {
081: interface ShutdownCallback {
082: void needTime(int time);
083:
084: void shutdownComplete();
085: }
086:
087: // //////////////////////////////////////
088:
089: private static final String STANDARD_IMPL_NAME = "JacORB-NotificationService";
090:
091: private static final long SHUTDOWN_INTERVAL = 1000;
092:
093: private static final String EVENTCHANNEL_FACTORY_POA_NAME = "EventChannelFactoryPOA";
094:
095: // //////////////////////////////////////
096:
097: private NameComponent[] registeredName_ = null;
098:
099: private NamingContext namingContext_;
100:
101: /**
102: * the method that is executed when destroy is invoked.
103: */
104: private Runnable destroyMethod_ = new Runnable() {
105: public void run() {
106: dispose();
107: }
108: };
109:
110: // ///////
111:
112: protected final MutablePicoContainer container_;
113:
114: protected final Configuration config_;
115:
116: protected final org.omg.CORBA.Object this Ref_;
117:
118: protected final Logger logger_;
119:
120: private final String ior_;
121:
122: private final String corbaLoc_;
123:
124: private final POA eventChannelFactoryPOA_;
125:
126: private final byte[] oid_;
127:
128: private final ChannelManager channelManager_ = new ChannelManager();
129:
130: private final AtomicInteger eventChannelIDPool_ = new AtomicInteger(
131: 0);
132:
133: private final DisposableManager disposableManager_ = new DisposableManager();
134:
135: // //////////////////////////////////////
136:
137: protected AbstractChannelFactory(
138: final MutablePicoContainer container, final ORB orb)
139: throws UserException {
140: container_ = PicoContainerFactory.createRootContainer(
141: container, (org.jacorb.orb.ORB) orb);
142:
143: if (container != null) {
144: disposableManager_.addDisposable(new Disposable() {
145: public void dispose() {
146: container.removeChildContainer(container_);
147: }
148: });
149: }
150:
151: disposableManager_.addDisposable(new Disposable() {
152: public void dispose() {
153: final POA _poa = (POA) container_
154: .getComponentInstanceOfType(POA.class);
155:
156: _poa.destroy(true, false);
157: }
158: });
159:
160: config_ = (Configuration) container_
161: .getComponentInstanceOfType(Configuration.class);
162:
163: logger_ = ((org.jacorb.config.Configuration) config_)
164: .getNamedLogger(getClass().getName());
165:
166: POA _rootPOA = (POA) container_
167: .getComponentInstanceOfType(POA.class);
168:
169: List _ps = new ArrayList();
170:
171: _ps
172: .add(_rootPOA
173: .create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID));
174:
175: BiDirGiopPOAComponentAdapter.addBiDirGiopPolicy(_ps, orb,
176: config_);
177:
178: org.omg.CORBA.Policy[] _policies = (org.omg.CORBA.Policy[]) _ps
179: .toArray(new org.omg.CORBA.Policy[_ps.size()]);
180:
181: eventChannelFactoryPOA_ = _rootPOA.create_POA(
182: EVENTCHANNEL_FACTORY_POA_NAME, _rootPOA
183: .the_POAManager(), _policies);
184:
185: for (int x = 0; x < _policies.length; ++x) {
186: _policies[x].destroy();
187: }
188:
189: oid_ = (getObjectName().getBytes());
190:
191: eventChannelFactoryPOA_.activate_object_with_id(oid_,
192: getServant());
193:
194: this Ref_ = eventChannelFactoryPOA_.id_to_reference(oid_);
195:
196: if (logger_.isDebugEnabled()) {
197: logger_.debug("activated EventChannelFactory with OID '"
198: + new String(oid_) + "' on '"
199: + eventChannelFactoryPOA_.the_name() + "'");
200: }
201:
202: ior_ = orb.object_to_string(eventChannelFactoryPOA_
203: .id_to_reference(oid_));
204:
205: corbaLoc_ = createCorbaLoc();
206:
207: ((org.jacorb.orb.ORB) orb).addObjectKey(getShortcut(), ior_);
208: }
209:
210: // //////////////////////////////////////
211:
212: protected abstract AbstractEventChannel newEventChannel()
213: throws ConfigurationException;
214:
215: protected abstract org.omg.CORBA.Object create_abstract_channel(
216: Property[] admin, Property[] qos, IntHolder id)
217: throws UnsupportedAdmin, UnsupportedQoS;
218:
219: protected abstract String getObjectName();
220:
221: protected abstract String getShortcut();
222:
223: protected abstract Servant getServant();
224:
225: // //////////////////////////////////////
226:
227: protected int getLocalPort() {
228: org.jacorb.orb.ORB jorb = (org.jacorb.orb.ORB) getORB();
229:
230: return jorb.getBasicAdapter().getPort();
231: }
232:
233: protected String getLocalAddress() {
234: org.jacorb.orb.ORB jorb = (org.jacorb.orb.ORB) getORB();
235:
236: return jorb.getBasicAdapter().getAddress();
237: }
238:
239: private String createCorbaLoc() {
240: StringBuffer _corbaLoc = new StringBuffer("corbaloc::");
241:
242: _corbaLoc.append(getLocalAddress());
243: _corbaLoc.append(":");
244: _corbaLoc.append(getLocalPort());
245: _corbaLoc.append("/");
246: _corbaLoc.append(getShortcut());
247:
248: return _corbaLoc.toString();
249: }
250:
251: public synchronized org.omg.CORBA.Object activate() {
252: return this Ref_;
253: }
254:
255: public void setDestroyMethod(Runnable destroyMethod) {
256: destroyMethod_ = destroyMethod;
257: }
258:
259: protected ORB getORB() {
260: return (ORB) container_.getComponentInstance(ORB.class);
261: }
262:
263: public final void deactivate() {
264: try {
265: eventChannelFactoryPOA_.deactivate_object(oid_);
266: } catch (Exception e) {
267: logger_.fatalError("unable to deactivate object", e);
268:
269: throw new RuntimeException();
270: }
271: }
272:
273: protected Configuration getConfiguration() {
274: return config_;
275: }
276:
277: public void dispose() {
278: try {
279: unregisterName();
280: } catch (Exception e) {
281: logger_.error(
282: "unable to unregister NameService registration", e);
283: }
284:
285: deactivate();
286:
287: channelManager_.dispose();
288:
289: container_.dispose();
290:
291: disposableManager_.dispose();
292: }
293:
294: protected void addToChannels(int id, AbstractEventChannel channel) {
295: channelManager_.add_channel(id, channel);
296: }
297:
298: protected int[] getAllChannels() {
299: return channelManager_.get_all_channels();
300: }
301:
302: protected AbstractEventChannel get_event_channel_servant(int id)
303: throws ChannelNotFound {
304: return channelManager_.get_channel_servant(id);
305: }
306:
307: protected Iterator getChannelIterator() {
308: return channelManager_.getChannelIterator();
309: }
310:
311: protected AbstractEventChannel create_channel_servant(IntHolder id,
312: Property[] qosProps, Property[] adminProps)
313: throws UnsupportedAdmin, UnsupportedQoS,
314: ConfigurationException {
315: // check QoS and Admin Settings
316:
317: AdminPropertySet _adminSettings = new AdminPropertySet(config_);
318:
319: _adminSettings.set_admin(adminProps);
320:
321: QoSPropertySet _qosSettings = new QoSPropertySet(config_,
322: QoSPropertySet.CHANNEL_QOS);
323:
324: _qosSettings.set_qos(qosProps);
325:
326: if (logger_.isDebugEnabled()) {
327: logger_.debug("uniqueQoSProps: " + _qosSettings);
328: logger_.debug("uniqueAdminProps: " + _adminSettings);
329: }
330:
331: checkQoSSettings(_qosSettings);
332:
333: AbstractEventChannel _eventChannelServant = newEventChannel();
334:
335: id.value = _eventChannelServant.getID();
336:
337: _eventChannelServant.set_qos(_qosSettings.toArray());
338: _eventChannelServant.set_admin(_adminSettings.toArray());
339:
340: channelCreated(_eventChannelServant);
341:
342: if (logger_.isDebugEnabled()) {
343: logger_.debug("created channel_servant id=" + id.value);
344: }
345:
346: return _eventChannelServant;
347: }
348:
349: protected void channelCreated(AbstractEventChannel channel) {
350: // empty
351: }
352:
353: private int createChannelIdentifier() {
354: return eventChannelIDPool_.getAndIncrement();
355: }
356:
357: private void checkQoSSettings(PropertySet uniqueQoSProperties)
358: throws UnsupportedQoS {
359: if (uniqueQoSProperties.containsKey(EventReliability.value)) {
360: short _eventReliabilty = uniqueQoSProperties.get(
361: EventReliability.value).extract_short();
362:
363: switch (_eventReliabilty) {
364: case BestEffort.value:
365: logger_.info("EventReliability=BestEffort");
366: break;
367:
368: case Persistent.value:
369: throwPersistentNotSupported(EventReliability.value);
370:
371: // fallthrough
372: default:
373: throwBadValue(EventReliability.value);
374: }
375: }
376:
377: short _connectionReliability = BestEffort.value;
378:
379: if (uniqueQoSProperties
380: .containsKey(ConnectionReliability.value)) {
381: _connectionReliability = uniqueQoSProperties.get(
382: ConnectionReliability.value).extract_short();
383:
384: switch (_connectionReliability) {
385: case BestEffort.value:
386: logger_.info("ConnectionReliability=BestEffort");
387: break;
388:
389: case Persistent.value:
390: throwPersistentNotSupported(ConnectionReliability.value);
391:
392: break; // to satisfy compiler
393: default:
394: throwBadValue(ConnectionReliability.value);
395: }
396: }
397: }
398:
399: private void throwPersistentNotSupported(String property)
400: throws UnsupportedQoS {
401: Any _lowVal = getORB().create_any();
402: Any _highVal = getORB().create_any();
403:
404: _lowVal.insert_short(BestEffort.value);
405: _highVal.insert_short(BestEffort.value);
406:
407: UnsupportedQoS _e = new UnsupportedQoS(
408: new PropertyError[] { new PropertyError(
409: QoSError_code.UNSUPPORTED_VALUE, property,
410: new PropertyRange(_lowVal, _highVal)) });
411:
412: throw _e;
413: }
414:
415: private void throwBadValue(String property) throws UnsupportedQoS {
416: Any _lowVal = getORB().create_any();
417: Any _highVal = getORB().create_any();
418:
419: _lowVal.insert_short(BestEffort.value);
420: _highVal.insert_short(BestEffort.value);
421:
422: UnsupportedQoS _e = new UnsupportedQoS(
423: "The specified Property Value is not supported",
424: new PropertyError[] { new PropertyError(
425: QoSError_code.BAD_VALUE, property,
426: new PropertyRange(_lowVal, _highVal)) });
427: throw _e;
428: }
429:
430: public void destroy() {
431: // start extra thread to
432: // shut down the Notification Service.
433: // otherwise ORB.shutdown() would be called inside
434: // a remote invocation which causes an exception.
435: final Thread _shutdown = new Thread() {
436: public void run() {
437: try {
438: logger_
439: .info("Notification Service is going down in "
440: + SHUTDOWN_INTERVAL + " ms");
441:
442: Thread.sleep(SHUTDOWN_INTERVAL);
443: } catch (InterruptedException e) {
444: // ignore
445: }
446:
447: destroyMethod_.run();
448: }
449: };
450:
451: _shutdown.start();
452: }
453:
454: /**
455: * shutdown is called by the Java Wrapper
456: */
457: public void shutdown(ShutdownCallback cb) {
458: // estimate shutdown time.
459: // during shutdown disconnect must be called on every
460: // connected client. in worst case the client is not
461: // acccessible anymore and disconnect raises TRANSIENT. as
462: // this could take some time request some more time from the
463: // WrapperManager who is initiating the shutdown.
464:
465: int _numberOfClients = 0;
466:
467: Iterator i = getChannelIterator();
468:
469: while (i.hasNext()) {
470: AbstractEventChannel _channel = (AbstractEventChannel) ((Map.Entry) i
471: .next()).getValue();
472:
473: _numberOfClients += _channel.getNumberOfConnectedClients();
474: }
475:
476: // TODO fetch this from somewhere?
477: int _connectionTimeout = 4000;
478:
479: int _estimatedShutdowntime = 2000 + _numberOfClients
480: * _connectionTimeout;
481:
482: if (logger_.isInfoEnabled()) {
483: logger_.info("Connected Clients: " + _numberOfClients);
484: logger_.info("Connection Timeout: " + _connectionTimeout
485: + " ms");
486: logger_.info("Estimated Shutdowntime: "
487: + _estimatedShutdowntime + " ms");
488: }
489:
490: // estimate 4000ms shutdowntime per channel
491: cb.needTime(_estimatedShutdowntime);
492:
493: logger_.info("NotificationService is going down");
494:
495: destroyMethod_.run();
496:
497: logger_.info("NotificationService down");
498:
499: cb.shutdownComplete();
500: }
501:
502: public String getIOR() {
503: return ior_;
504: }
505:
506: public String getCorbaLoc() {
507: return corbaLoc_;
508: }
509:
510: private static AbstractChannelFactory newChannelFactory(
511: MutablePicoContainer container, ORB orb, boolean typed)
512: throws UserException {
513: if (typed) {
514: return new TypedEventChannelFactoryImpl(container, orb);
515: }
516:
517: return new EventChannelFactoryImpl(container, orb);
518: }
519:
520: private static AbstractChannelFactory newFactory(
521: MutablePicoContainer container, final ORB orb,
522: boolean startThread, Properties props) throws Exception {
523: AbstractChannelFactory _factory = newChannelFactory(container,
524: orb, "on".equals(props
525: .get(Attributes.ENABLE_TYPED_CHANNEL)));
526:
527: // force activation
528: _factory.activate();
529:
530: _factory.printIOR(props);
531:
532: _factory.printCorbaLoc(props);
533:
534: _factory.writeIOR(props);
535:
536: _factory.registerName(props);
537:
538: _factory.startChannels(props);
539:
540: if (startThread) {
541: POA _poa = POAHelper.narrow(orb
542: .resolve_initial_references("RootPOA"));
543: _poa.the_POAManager().activate();
544:
545: Thread _orbThread = new Thread(new Runnable() {
546: public void run() {
547: orb.run();
548: }
549: });
550:
551: _orbThread.setName("Notification ORB Runner Thread");
552:
553: _orbThread.setDaemon(false);
554:
555: _orbThread.start();
556:
557: _factory.disposableManager_.addDisposable(new Disposable() {
558: public void dispose() {
559: orb.shutdown(false);
560: }
561: });
562: }
563:
564: return _factory;
565: }
566:
567: public static AbstractChannelFactory newFactory(ORB optionalORB,
568: MutablePicoContainer optionalContainer, Properties props)
569: throws Exception {
570: props.put("jacorb.implname", STANDARD_IMPL_NAME);
571:
572: final ORB _orb;
573: if (optionalORB != null) {
574: _orb = optionalORB;
575: } else {
576: _orb = ORB.init(new String[] {}, props);
577: }
578:
579: AbstractChannelFactory factory = newFactory(optionalContainer,
580: _orb, (optionalORB == null), props);
581:
582: return factory;
583: }
584:
585: public static AbstractChannelFactory newFactory(Properties props)
586: throws Exception {
587: return newFactory(null, null, props);
588: }
589:
590: public void registerName(Properties props) throws Exception {
591: registerName(props.getProperty(Attributes.REGISTER_NAME_ID,
592: null), props.getProperty(Attributes.REGISTER_NAME_KIND,
593: ""));
594: }
595:
596: public synchronized void registerName(String nameId, String nameKind)
597: throws NotFound, CannotProceed, InvalidName,
598: org.omg.CORBA.ORBPackage.InvalidName {
599: if (nameId == null) {
600: return;
601: }
602:
603: namingContext_ = NamingContextHelper.narrow(getORB()
604: .resolve_initial_references("NameService"));
605:
606: NameComponent[] _name = new NameComponent[] { new NameComponent(
607: nameId, nameKind) };
608:
609: if (logger_.isInfoEnabled()) {
610: logger_.info("namingContext.rebind(" + format(_name)
611: + " => " + getCorbaLoc() + ")");
612: }
613:
614: namingContext_.rebind(_name, this Ref_);
615:
616: registeredName_ = _name;
617: }
618:
619: public synchronized void unregisterName() throws NotFound,
620: CannotProceed, InvalidName {
621: if (namingContext_ != null) {
622: if (registeredName_ != null) {
623: if (logger_.isInfoEnabled()) {
624: logger_.info("namingContext.unbind("
625: + format(registeredName_) + ")");
626: }
627:
628: namingContext_.unbind(registeredName_);
629:
630: registeredName_ = null;
631: }
632: }
633: }
634:
635: private static String format(NameComponent[] name) {
636: StringBuffer b = new StringBuffer();
637:
638: for (int i = 0; i < name.length; ++i) {
639: if (i != 0) {
640: b.append('/');
641: }
642:
643: format(name[i], b);
644: }
645:
646: return b.toString();
647: }
648:
649: private static void format(NameComponent name, StringBuffer b) {
650: b.append(name.id);
651:
652: if (name.kind != null) {
653: b.append('.');
654: b.append(name.kind);
655: }
656: }
657:
658: private void startChannels(Properties props) throws UnsupportedQoS,
659: UnsupportedAdmin {
660: if (props.containsKey(Attributes.START_CHANNELS)) {
661: startChannels(Integer.parseInt((String) props
662: .get(Attributes.START_CHANNELS)));
663: }
664: }
665:
666: private void startChannels(int channels) throws UnsupportedQoS,
667: UnsupportedAdmin {
668: for (int i = 0; i < channels; i++) {
669: IntHolder ih = new IntHolder();
670: create_abstract_channel(new Property[0], new Property[0],
671: ih);
672: }
673: }
674:
675: private void printIOR(Properties props) {
676: if ("on".equals(props.get(Attributes.PRINT_IOR))) {
677: System.out.println(getIOR());
678: }
679: }
680:
681: private void printCorbaLoc(Properties props) {
682: if ("on".equals(props.get(Attributes.PRINT_CORBALOC))) {
683: System.out.println(getCorbaLoc());
684: }
685: }
686:
687: private void writeIOR(Properties props) throws IOException {
688: String _iorFileName = (String) props.get(Attributes.IOR_FILE);
689:
690: if (_iorFileName != null) {
691: writeIOR(_iorFileName);
692: }
693: }
694:
695: public void writeIOR(String fileName) throws IOException {
696: FileWriter out = new FileWriter(fileName);
697:
698: try {
699: writeIOR(out);
700: out.flush();
701: } finally {
702: out.close();
703: }
704: }
705:
706: private void writeIOR(Writer out) {
707: PrintWriter writer = new PrintWriter(out);
708: writer.println(getIOR());
709: }
710:
711: public POA _default_POA() {
712: return eventChannelFactoryPOA_;
713: }
714:
715: protected MutablePicoContainer newContainerForChannel() {
716: final MutablePicoContainer _channelContainer = PicoContainerFactory
717: .createChildContainer(container_);
718:
719: // create identifier
720: final int _channelID = createChannelIdentifier();
721: IFactory _factory = new IFactory() {
722: public MutablePicoContainer getContainer() {
723: return _channelContainer;
724: }
725:
726: public int getChannelID() {
727: return _channelID;
728: }
729:
730: public void destroy() {
731: container_.removeChildContainer(_channelContainer);
732: }
733: };
734:
735: _channelContainer.registerComponentInstance(IFactory.class,
736: _factory);
737: return _channelContainer;
738: }
739: }
|