001: package demo.notification.whiteboard;
002:
003: import java.util.Hashtable;
004: import java.util.Map;
005:
006: import org.omg.CORBA.IntHolder;
007: import org.omg.CORBA.ORB;
008: import org.omg.CosEventChannelAdmin.AlreadyConnected;
009: import org.omg.CosEventComm.Disconnected;
010: import org.omg.CosNotification.EventHeader;
011: import org.omg.CosNotification.EventType;
012: import org.omg.CosNotification.FixedEventHeader;
013: import org.omg.CosNotification.Property;
014: import org.omg.CosNotification.StructuredEvent;
015: import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
016: import org.omg.CosNotifyChannelAdmin.AdminNotFound;
017: import org.omg.CosNotifyChannelAdmin.ClientType;
018: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
019: import org.omg.CosNotifyChannelAdmin.EventChannel;
020: import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator;
021: import org.omg.CosNotifyChannelAdmin.StructuredProxyPushConsumer;
022: import org.omg.CosNotifyChannelAdmin.StructuredProxyPushConsumerHelper;
023: import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
024: import org.omg.CosNotifyComm.InvalidEventType;
025: import org.omg.CosNotifyComm.StructuredPushSupplierOperations;
026: import org.omg.CosNotifyComm.StructuredPushSupplierPOATie;
027: import org.omg.CosNotifyFilter.ConstraintExp;
028: import org.omg.CosNotifyFilter.Filter;
029: import org.omg.CosNotifyFilter.FilterFactory;
030: import org.omg.CosNotifyFilter.InvalidConstraint;
031: import org.omg.CosNotifyFilter.InvalidGrammar;
032:
033: /**
034: * @author Alphonse Bendt
035: * @version $Id: WhiteBoard.java,v 1.5 2006/02/02 19:39:35 phil.mesnier Exp $
036: */
037:
038: public class WhiteBoard extends IWhiteBoardPOA implements
039: IWhiteBoardOperations, WhiteboardVars {
040:
041: // zur Erzeugung einer eindeutigen Id
042: static int COUNT = 0;
043:
044: EventChannel channel_;
045: FilterFactory filterFactory_;
046:
047: // alle angemeldeten Workgroups
048: protected Map workgroups_;
049:
050: Map registrationInfo_ = new Hashtable();
051:
052: // bisher gesammelte Updates
053: // Queue updates_;
054:
055: // Dispatcher Thread
056: Dispatcher disp;
057:
058: // prima"rkopie
059: BrushSizePixelImage image;
060:
061: public WhiteBoard(ORB orb, EventChannel channel)
062: throws AdminLimitExceeded {
063: this (orb, channel, 400, 400);
064: }
065:
066: public WhiteBoard(ORB orb, EventChannel channel, int xsize,
067: int ysize) throws AdminLimitExceeded {
068:
069: System.out.println("WhiteBoard.init()");
070:
071: _this (orb);
072: workgroups_ = new Hashtable();
073: clear();
074: image = new ExtendedPixelImage(xsize, ysize);
075: disp = new Dispatcher(orb, this , channel
076: .default_supplier_admin());
077: // disp.start();
078: System.out.println("done");
079: filterFactory_ = channel.default_filter_factory();
080: channel_ = channel;
081: }
082:
083: public int[] getCurrentImage() {
084: return image.getPixelBuffer();
085: }
086:
087: // Workgroup an Whiteboard anmelden
088: // Id zur Kommunikation mit dem Whiteboard zuru"ck
089: public IRegistrationInfo join(IWorkgroup group) {
090: System.out.println("Workgroup joins the Whiteboard ");
091:
092: Integer _id = new Integer(COUNT++);
093: System.out.println("workgroups: " + workgroups_ + ".put(" + _id
094: + ", " + group);
095:
096: workgroups_.put(_id, group);
097:
098: IRegistrationInfo _registrationInfo = new IRegistrationInfo();
099: _registrationInfo.workgroup_identifier = _id.intValue();
100:
101: IntHolder _supplierAdminId = new IntHolder(), _consumerAdminId = new IntHolder();
102:
103: Filter _filter = null;
104: try {
105: _filter = filterFactory_.create_filter("EXTENDED_TCL");
106:
107: ConstraintExp[] _constraints = new ConstraintExp[1];
108: _constraints[0] = new ConstraintExp();
109: _constraints[0].event_types = new EventType[] { new EventType(
110: EVENT_DOMAIN, "*") };
111: _constraints[0].constraint_expr = "$.header.variable_header("
112: + WORKGROUP_ID + ") != " + _id.intValue();
113:
114: _filter.add_constraints(_constraints);
115:
116: SupplierAdmin _supplierAdmin = channel_.new_for_suppliers(
117: InterFilterGroupOperator.AND_OP, _supplierAdminId);
118:
119: ConsumerAdmin _consumerAdmin = channel_.new_for_consumers(
120: InterFilterGroupOperator.AND_OP, _consumerAdminId);
121: _consumerAdmin.add_filter(_filter);
122:
123: _registrationInfo.supplier_admin = _supplierAdmin;
124: _registrationInfo.consumer_admin = _consumerAdmin;
125: _registrationInfo.filter_factory = filterFactory_;
126:
127: LocalRegistrationInfo _localInfo = new LocalRegistrationInfo();
128: _localInfo.consumerAdmin_ = _consumerAdminId.value;
129: _localInfo.supplierAdmin_ = _supplierAdminId.value;
130:
131: registrationInfo_.put(_id, _localInfo);
132:
133: return _registrationInfo;
134: } catch (InvalidGrammar ig) {
135: ig.printStackTrace();
136: } catch (InvalidConstraint ic) {
137: ic.printStackTrace();
138: }
139: throw new RuntimeException();
140: }
141:
142: public boolean leave(int workgroup) {
143: System.out.println("Bye");
144:
145: Integer _id = new Integer(workgroup);
146:
147: LocalRegistrationInfo _info = (LocalRegistrationInfo) registrationInfo_
148: .get(_id);
149:
150: try {
151: channel_.get_consumeradmin(_info.consumerAdmin_).destroy();
152: channel_.get_supplieradmin(_info.supplierAdmin_).destroy();
153: } catch (AdminNotFound anf) {
154: }
155:
156: return (workgroups_.remove(_id) != null);
157: }
158:
159: // Whiteboard lo"schen
160: public void clear() {
161: // updates_ = new Queue();
162: }
163:
164: } // WhiteBoard
165:
166: class Dispatcher extends Thread implements WhiteboardVars,
167: StructuredPushSupplierOperations {
168:
169: boolean connected_ = false;
170: boolean active_ = true;
171: WhiteBoard master_;
172: StructuredProxyPushConsumer myConsumer_;
173: ORB orb_;
174: IntHolder myConsumerId_ = new IntHolder();
175: static long SLEEP = 10000L;
176:
177: public Dispatcher(ORB orb, WhiteBoard master,
178: SupplierAdmin supplierAdmin) throws AdminLimitExceeded {
179:
180: master_ = master;
181: orb_ = orb;
182: myConsumer_ = StructuredProxyPushConsumerHelper
183: .narrow(supplierAdmin
184: .obtain_notification_push_consumer(
185: ClientType.STRUCTURED_EVENT,
186: myConsumerId_));
187: connected_ = tryConnect();
188: }
189:
190: boolean tryConnect() {
191: long BACKOFF = 100L;
192:
193: for (int x = 0; x < 3; ++x) {
194: try {
195: myConsumer_
196: .connect_structured_push_supplier(new StructuredPushSupplierPOATie(
197: this )._this (orb_));
198: connected_ = true;
199: return true;
200: } catch (AlreadyConnected ac) {
201: }
202:
203: try {
204: Thread.sleep(BACKOFF ^ (x + 1));
205: } catch (InterruptedException ie) {
206: }
207: }
208: return false;
209: }
210:
211: public void shutdown() {
212: active_ = false;
213: interrupt();
214: }
215:
216: public void run() {
217: StructuredEvent _event = new StructuredEvent();
218: _event.header = new EventHeader();
219: _event.header.fixed_header = new FixedEventHeader();
220: _event.header.fixed_header.event_type = new EventType();
221: _event.header.fixed_header.event_type.domain_name = EVENT_DOMAIN;
222: _event.header.fixed_header.event_type.type_name = "UPDATE";
223: _event.header.fixed_header.event_name = "";
224: _event.header.variable_header = new Property[0];
225:
226: _event.filterable_data = new Property[0];
227:
228: while (active_) {
229: try {
230: Thread.sleep(SLEEP);
231: } catch (InterruptedException ie) {
232: if (!active_) {
233: return;
234: }
235: }
236:
237: if (!connected_) {
238: connected_ = tryConnect();
239: if (!connected_) {
240: System.out.println("Giving Up");
241: return;
242: }
243: }
244:
245: try {
246: WhiteboardUpdate _update = new WhiteboardUpdate();
247: _update.image(master_.getCurrentImage());
248: _event.remainder_of_body = orb_.create_any();
249: WhiteboardUpdateHelper.insert(_event.remainder_of_body,
250: _update);
251: myConsumer_.push_structured_event(_event);
252: } catch (Disconnected d) {
253: connected_ = false;
254: }
255: }
256: }
257:
258: public void disconnect_structured_push_supplier() {
259: connected_ = false;
260: }
261:
262: public void subscription_change(EventType[] t1, EventType[] t2)
263: throws InvalidEventType {
264: }
265: }
266:
267: class LocalRegistrationInfo {
268: int consumerAdmin_;
269: int supplierAdmin_;
270: }
|