001: package org.jacorb.events;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.omg.CosEventChannelAdmin.*;
024: import org.omg.CosEventComm.*;
025: import org.omg.CORBA.*;
026: import org.omg.CosNaming.*;
027: import org.omg.PortableServer.*;
028: import java.util.*;
029: import org.jacorb.orb.*;
030: import java.net.*;
031:
032: /**
033: * Simple implementation of the event channel spec.
034: * The event channel acts as a factory for proxy push/pull consumers/suppliers
035: * and interacts with the implementation objects locally, i.e. using Java
036: * references only.
037: *
038: * @author Joerg v. Frantzius, Rainer Lischetzki, Gerald Brose, Jeff Carlson
039: * @version $Id: EventChannelImpl.java,v 1.10 2004/05/06 12:39:58 nicolas Exp $
040: */
041:
042: public class EventChannelImpl extends JacORBEventChannelPOA {
043: private Vector pullSuppliers = new Vector();
044: private Vector pullConsumers = new Vector();
045: private Vector pushSuppliers = new Vector();
046: private Vector pushConsumers = new Vector();
047: private Vector pendingEvents = new Vector();
048: private org.omg.CORBA.Any nullAny = null;
049:
050: private org.omg.CORBA.ORB myOrb = null;
051: private org.omg.PortableServer.POA myPoa = null;
052:
053: /**
054: * EventChannel constructor.
055: */
056: public EventChannelImpl(org.omg.CORBA.ORB orb,
057: org.omg.PortableServer.POA poa) {
058: myOrb = orb;
059: myPoa = poa;
060:
061: _this _object(myOrb);
062: nullAny = myOrb.create_any();
063: nullAny.type(myOrb.get_primitive_tc(TCKind.tk_null));
064:
065: try {
066: this .myPoa = poa;
067: myPoa.the_POAManager().activate();
068: } catch (Exception e) {
069: e.printStackTrace();
070: }
071: }
072:
073: /**
074: * send the ConsumerAdmin vectors off for destrcution.
075: */
076: private void consumerAdminDestroy() {
077: releaseList(pullSuppliers);
078: releaseList(pushSuppliers);
079: }
080:
081: /**
082: * send the SupplierAdmin vectors off for destrcution.
083: */
084: private void supplierAdminDestroy() {
085: releaseList(pullConsumers);
086: releaseList(pushConsumers);
087: }
088:
089: /**
090: * Iteratre a list and send the servant off to be destroyed.
091: */
092: private void releaseList(Vector list) {
093: for (Enumeration e = list.elements(); e.hasMoreElements();) {
094: org.omg.PortableServer.Servant servant = (org.omg.PortableServer.Servant) e
095: .nextElement();
096: releaseServant(servant);
097: }
098: }
099:
100: /**
101: * Destroy / deactivate the servant.
102: */
103: private void releaseServant(org.omg.PortableServer.Servant servant) {
104: try {
105: servant._poa().deactivate_object(servant._object_id());
106: } catch (org.omg.PortableServer.POAPackage.WrongPolicy wpEx) {
107: wpEx.printStackTrace();
108: } catch (org.omg.PortableServer.POAPackage.ObjectNotActive onaEx) {
109: onaEx.printStackTrace();
110: }
111: }
112:
113: /**
114: * Destroy all objects which are managed by the POA.
115: */
116: public void destroy() {
117: consumerAdminDestroy();
118: supplierAdminDestroy();
119: releaseServant(this );
120: }
121:
122: /**
123: * Return the consumerAdmin interface
124: */
125: public ConsumerAdmin for_consumers() {
126: try {
127: return ConsumerAdminHelper.narrow(myPoa
128: .servant_to_reference(this ));
129: } catch (Exception e) {
130: e.printStackTrace();
131: return null;
132: }
133: }
134:
135: /**
136: * Return the supplierAdmin interface
137: */
138: public SupplierAdmin for_suppliers() {
139: try {
140: return SupplierAdminHelper.narrow(myPoa
141: .servant_to_reference(this ));
142: } catch (Exception e) {
143: e.printStackTrace();
144: return null;
145: }
146: }
147:
148: /**
149: * Return a ProxyPullConsumer reference to be used to connect to a
150: * PullSupplier.
151: */
152: public ProxyPullConsumer obtain_pull_consumer() {
153: synchronized (pullConsumers) {
154: ProxyPullConsumerImpl p = new ProxyPullConsumerImpl(this ,
155: _orb(), myPoa);
156: pullConsumers.addElement(p);
157: return p._this (myOrb);
158: }
159: }
160:
161: /**
162: * Return a ProxyPullSupplier reference to be used to connect to a
163: * PullConsumer.
164: */
165: public ProxyPullSupplier obtain_pull_supplier() {
166: synchronized (pullSuppliers) {
167: ProxyPullSupplierImpl p = new ProxyPullSupplierImpl(this ,
168: _orb(), myPoa);
169: pullSuppliers.addElement(p);
170: return p._this (myOrb);
171: }
172: }
173:
174: /**
175: * Return a ProxyPushConsumer reference to be used to connect to a
176: * PushSupplier.
177: */
178: public ProxyPushConsumer obtain_push_consumer() {
179: synchronized (pushConsumers) {
180: ProxyPushConsumerImpl p = new ProxyPushConsumerImpl(this ,
181: _orb(), myPoa);
182: pushConsumers.addElement(p);
183: return p._this (myOrb);
184: }
185: }
186:
187: /**
188: * Return a ProxyPushSupplier reference to be used to connect to a
189: * PushConsumer.
190: */
191: public ProxyPushSupplier obtain_push_supplier() {
192: synchronized (pushSuppliers) {
193: ProxyPushSupplierImpl p = new ProxyPushSupplierImpl(this ,
194: _orb(), myPoa);
195: pushSuppliers.addElement(p);
196: return p._this (myOrb);
197: }
198: }
199:
200: /**
201: * Send event to all registered consumers.
202: */
203: protected void push_event(org.omg.CORBA.Any event) {
204: ProxyPushSupplierImpl push = null;
205: ProxyPullSupplierImpl pull = null;
206: synchronized (pushSuppliers) {
207: for (int i = (pushSuppliers.size() - 1); i >= 0; --i) {
208: push = (ProxyPushSupplierImpl) pushSuppliers
209: .elementAt(i);
210: try {
211: push.push_to_consumer(event);
212: } catch (org.omg.CORBA.COMM_FAILURE comm) {
213: pushSuppliers.removeElementAt(i);
214: }
215: }
216: }
217:
218: synchronized (pullSuppliers) {
219: for (int i = (pullSuppliers.size() - 1); i >= 0; --i) {
220: pull = (ProxyPullSupplierImpl) pullSuppliers
221: .elementAt(i);
222:
223: try {
224: pull.push_to_supplier(event);
225: } catch (org.omg.CORBA.COMM_FAILURE comm) {
226: pullSuppliers.removeElementAt(i);
227: }
228: }
229: }
230: }
231:
232: static public void main(String[] args) {
233: org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
234: try {
235: org.omg.PortableServer.POA poa = org.omg.PortableServer.POAHelper
236: .narrow(orb.resolve_initial_references("RootPOA"));
237:
238: EventChannelImpl channel = new EventChannelImpl(orb, poa);
239:
240: poa.the_POAManager().activate();
241:
242: org.omg.CORBA.Object o = poa.servant_to_reference(channel);
243:
244: NamingContextExt nc = NamingContextExtHelper.narrow(orb
245: .resolve_initial_references("NameService"));
246:
247: String channelName = (args.length > 0 ? args[0]
248: : "Generic.channel");
249:
250: nc.bind(nc.to_name(channelName), o);
251: orb.run();
252: } catch (Exception e) {
253: e.printStackTrace();
254: }
255: }
256:
257: /**
258: * Override this method from the Servant baseclass. Fintan Bolton
259: * in his book "Pure CORBA" suggests that you override this method to
260: * avoid the risk that a servant object (like this one) could be
261: * activated by the <b>wrong</b> POA object.
262: */
263: public org.omg.PortableServer.POA _default_POA() {
264: return myPoa;
265: }
266: }
|