001: /*
002: * DDS (Data Distribution Service) for JacORB
003: *
004: * Copyright (C) 2005 , Ahmed yehdih <ahmed.yehdih@gmail.com>, fouad
005: allaoui <fouad.allaoui@gmail.com>, Didier Donsez (didier.donsez@ieee.org)
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU Library General Public License
009: * as published by the Free Software Foundation; either version 2
010: * of the License, or (at your option) any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Library General Public License for more details.
016: *
017: * You should have received a copy of the GNU Library General Public License
018: * along with this program; if not, write to the Free Software
019: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
020: 02111-1307, USA.
021: *
022: * Coontact: Ahmed yehdih <ahmed.yehdih@gmail.com>, fouad allaoui
023: <fouad.allaoui@gmail.com>, Didier Donsez (didier.donsez@ieee.org)
024: * Contributor(s):
025: *
026: **/
027:
028: package org.jacorb.dds;
029:
030: import java.lang.reflect.Method;
031: import java.util.Iterator;
032: import java.util.Vector;
033:
034: import org.omg.CORBA.Any;
035: import org.omg.CosEventChannelAdmin.ConsumerAdmin;
036: import org.omg.CosEventChannelAdmin.EventChannel;
037: import org.omg.CosEventChannelAdmin.EventChannelHelper;
038: import org.omg.CosEventChannelAdmin.ProxyPushSupplier;
039: import org.omg.CosEventComm.PushConsumer;
040: import org.omg.CosEventComm.PushConsumerHelper;
041: import org.omg.CosEventComm.PushConsumerOperations;
042: import org.omg.CosEventComm.PushConsumerPOATie;
043: import org.omg.CosNaming.NamingContextExt;
044: import org.omg.CosNaming.NamingContextExtHelper;
045: import org.omg.dds.DataReader;
046: import org.omg.dds.DomainParticipant;
047: import org.omg.dds.Subscriber;
048: import org.omg.dds.Topic;
049: import org.omg.dds.TopicHelper;
050:
051: public class ThreadSubscriber extends Thread implements
052: PushConsumerOperations {
053:
054: private EventChannel ecs;
055: private ConsumerAdmin ca;
056: private PushConsumer pushConsumer;
057: private ProxyPushSupplier pps;
058: private org.omg.PortableServer.POA poa;
059: private Vector references_domaines_participant;
060: private NamingContextExt nc;
061: //all subscriber interested of topic
062: private Vector all_Sub;
063: private Topic topic;
064: private org.omg.CORBA.ORB orb = null;
065:
066: public ThreadSubscriber(org.omg.CORBA.ORB orb,
067: org.omg.PortableServer.POA poa) {
068: ecs = null;
069: ca = null;
070: pushConsumer = null;
071: pps = null;
072: references_domaines_participant = new Vector();
073: all_Sub = new Vector();
074:
075: try {
076: this .orb = orb;
077: this .poa = poa;
078: NamingContextExt nc = NamingContextExtHelper.narrow(orb
079: .resolve_initial_references("NameService"));
080: ecs = EventChannelHelper.narrow(nc.resolve(nc
081: .to_name("eventchannel")));
082: } catch (Exception e) {
083: e.printStackTrace();
084: }
085:
086: ca = ecs.for_consumers();
087: pps = ca.obtain_push_supplier();
088: }
089:
090: public void disconnect_push_consumer() {
091: System.out.println("Consumer disconnected.");
092: }
093:
094: public void run() {
095:
096: try {
097: PushConsumerPOATie pt = new PushConsumerPOATie(this );
098: pt._this _object(orb);
099: pushConsumer = PushConsumerHelper.narrow(poa
100: .servant_to_reference(pt));
101: pps.connect_push_consumer(pushConsumer);
102: System.out.println("PushConsumerImpl registered.");
103: orb.run();
104: } catch (Exception e) {
105: e.printStackTrace();
106: }
107:
108: System.out.println("Quit.");
109: }
110:
111: /*
112: * lookup the interested dataReader by topic name
113: */
114: public void Lookup_Sub_interesded_of_topic(String topic_name) {
115:
116: org.omg.dds.DomainParticipant domain_temp;
117: DomainParticipantImpl domain_impl;
118: Subscriber sub_temp;
119: SubscriberImpl sub_impl;
120: Iterator _it_DomainParticipant = references_domaines_participant
121: .iterator();
122: Iterator _it_Subscriber;
123:
124: while (_it_DomainParticipant.hasNext()) {
125:
126: try {
127: domain_temp = (DomainParticipant) _it_DomainParticipant
128: .next();
129: domain_impl = (DomainParticipantImpl) poa
130: .reference_to_servant(domain_temp);
131: _it_Subscriber = domain_impl.getVector_Subscriber()
132: .iterator();
133:
134: while (_it_Subscriber.hasNext()) {
135: sub_temp = (Subscriber) _it_Subscriber.next();
136: if (sub_temp.lookup_datareader(topic_name) != null) {
137: all_Sub.add(sub_temp);
138: }
139: }
140: } catch (Exception e) {
141: System.out.println("Exception " + e);
142: e.printStackTrace();
143: }
144: }
145: }
146:
147: public synchronized void push(org.omg.CORBA.Any data)
148: throws org.omg.CosEventComm.Disconnected {
149:
150: Object instance = null;
151: boolean is_topic = false;
152: boolean is_instance = false;
153: Subscriber sub_temp;
154: SubscriberImpl sub_impl_temp;
155: DataReader DR;
156: Class typehelper = null;
157: Class type_param_extract[] = new Class[1];
158: java.lang.Object valu_param_extract[] = new java.lang.Object[1];
159: Iterator It;
160:
161: if (data.type().equal(TopicHelper.type())) {
162: is_topic = true;
163: topic = TopicHelper.extract(data);
164: } else {
165: is_instance = true;
166: valu_param_extract[0] = data;
167: type_param_extract[0] = Any.class;
168:
169: try {
170: typehelper = Class.forName(topic.get_type_name()
171: + "Helper");
172: Method extract = typehelper.getMethod("extract",
173: type_param_extract);
174: instance = extract.invoke(null, valu_param_extract);
175: } catch (Exception e) {
176: e.printStackTrace();
177: }
178: }
179:
180: if (is_topic) {
181: Lookup_Sub_interesded_of_topic(topic.get_name());
182: } else if (is_instance) {
183: try {
184: It = all_Sub.iterator();
185: while (It.hasNext()) {
186: sub_temp = (Subscriber) It.next();
187: sub_impl_temp = (SubscriberImpl) poa
188: .reference_to_servant(sub_temp);
189: sub_impl_temp.setInstance(instance);
190: DR = sub_temp.lookup_datareader(topic.get_name());
191: DR.take_instance_from_subscriber();
192:
193: if (DR.get_listener() != null) {
194: DR.get_listener().on_data_available(DR);
195: }
196: }
197: all_Sub.removeAllElements();
198: } catch (Exception e) {
199: System.out.println("Exep = " + e);
200: e.printStackTrace();
201: }
202: }
203: }
204:
205: /**
206: * @param arg0
207: * @return
208: */
209: public boolean add(Object arg0) {
210:
211: return references_domaines_participant.add(arg0);
212: }
213: }
|