001: /*
002: * DDS (Data Distribution Service) for JacORB
003: *
004: * Copyright (C) 2005 , Ahmed yehdih <ahmed.yehdih@gmail.com>, fouad
005: * allaoui <fouad.allaoui@gmail.com>, Didier Donsez (didier.donsez@ieee.org)
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU Library General Public License
009: * as published by the Free Software Foundation; either version 2
010: * of the License, or (at your option) any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Library General Public License for more details.
016: *
017: * You should have received a copy of the GNU Library General Public
018: * License along with this program; if not, write to the Free Software
019: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
020: * 02111-1307, USA.
021: *
022: * Coontact: Ahmed yehdih <ahmed.yehdih@gmail.com>, fouad allaoui
023: * <fouad.allaoui@gmail.com>, Didier Donsez (didier.donsez@ieee.org)
024: * Contributor(s)
025: *
026: **/
027: package org.jacorb.dds;
028:
029: import java.lang.reflect.Method;
030: import java.util.Iterator;
031: import java.util.Vector;
032:
033: import org.omg.dds.DataReader;
034: import org.omg.dds.DataReaderListener;
035: import org.omg.dds.DataReaderQos;
036: import org.omg.dds.DataReaderQosHolder;
037: import org.omg.dds.DataReaderSeqHolder;
038: import org.omg.dds.DomainParticipant;
039: import org.omg.dds.RETCODE_OK;
040: import org.omg.dds.RETCODE_PRECONDITION_NOT_MET;
041: import org.omg.dds.StatusCondition;
042: import org.omg.dds.Subscriber;
043: import org.omg.dds.SubscriberListener;
044: import org.omg.dds.SubscriberPOA;
045: import org.omg.dds.SubscriberQos;
046: import org.omg.dds.SubscriberQosHolder;
047: import org.omg.dds.Topic;
048: import org.omg.dds.TopicDescription;
049: import org.omg.dds.TopicHelper;
050: import org.omg.dds.TopicQos;
051: import org.omg.PortableServer.Servant;
052:
053: /**
054: * A Subscriber is the object responsible for the actual reception of the data resulting
055: * from its subscriptions. A Subscriber acts on the behalf of one or several DataReader
056: * objects that are related to it. When it receives data (from the other parts of the
057: * system), it builds the list of concerned DataReader objects, and then indicates to
058: * the application that data is available, through its listener or by enabling related
059: * conditions. The application can access the list of concerned DataReader objects
060: * through the operation get_datareaders and then access the data available though
061: * operations on the DataReader.
062: */
063: public class SubscriberImpl extends SubscriberPOA {
064:
065: private org.omg.CORBA.ORB orb;
066: private org.omg.PortableServer.POA poa;
067: private Vector Vector_DataReaders;
068: private SubscriberQos qos;
069: private DomainParticipant DP_Parent;
070: private DataReaderQos Default_DataReaderqos;
071: // represent the data wrote by the datawriter
072: private Object instance;
073: private SubscriberListener listner;
074:
075: /**
076: * @param qos
077: * @param listner
078: * @param DP
079: */
080: public SubscriberImpl(SubscriberQos qos,
081: SubscriberListener listner, DomainParticipant DP) {
082: this .qos = qos;
083: this .listner = listner;
084: Vector_DataReaders = new Vector();
085: this .DP_Parent = DP;
086: }
087:
088: /**
089: * @param a_topic
090: * @param qos
091: * @param a_listener
092: * @return
093: */
094: public DataReader create_datareader(TopicDescription a_topic,
095: DataReaderQos qos, DataReaderListener a_listener) {
096: DataReader DR = null;
097: Servant impl;
098:
099: try {
100:
101: Class type = Class.forName(a_topic.get_type_name()
102: + "DataReaderImpl");
103: Class typehelper = Class.forName(a_topic.get_type_name()
104: + "DataReaderHelper");
105: Class type_param_constructor[] = new Class[6];
106: Object valu_param_constructor[] = new Object[6];
107: type_param_constructor[0] = DataReaderQos.class;
108: type_param_constructor[1] = DataReaderListener.class;
109: type_param_constructor[2] = Subscriber.class;
110: type_param_constructor[3] = Topic.class;
111: type_param_constructor[4] = org.omg.CORBA.ORB.class;
112: type_param_constructor[5] = org.omg.PortableServer.POA.class;
113: valu_param_constructor[0] = qos;
114: valu_param_constructor[1] = a_listener;
115: valu_param_constructor[2] = this ._this ();
116: valu_param_constructor[3] = TopicHelper.narrow(a_topic);
117: valu_param_constructor[4] = orb;
118: valu_param_constructor[5] = poa;
119: impl = (Servant) type
120: .getConstructor(type_param_constructor)
121: .newInstance(valu_param_constructor);
122:
123: org.omg.CORBA.Object oref = poa.servant_to_reference(impl);
124: Class type_param_narrow[] = new Class[1];
125: org.omg.CORBA.Object valu_param_narrow[] = new org.omg.CORBA.Object[1];
126: valu_param_narrow[0] = oref;
127: type_param_narrow[0] = Class
128: .forName("org.omg.CORBA.Object");
129: Method Narrow = typehelper.getMethod("narrow",
130: type_param_narrow);
131: DR = (DataReader) Narrow.invoke(null, valu_param_narrow);
132: add(DR);
133:
134: } catch (Exception e) {
135: System.out.println("Eroor " + e);
136: e.printStackTrace();
137: }
138:
139: return DR;
140: }
141:
142: /**
143: * @param a_datareader
144: * @return
145: */
146: public int delete_datareader(DataReader a_datareader) {
147: if (_this () == a_datareader.get_subscriber()) {
148: remove(a_datareader);
149: return RETCODE_OK.value;
150: } else
151: return RETCODE_PRECONDITION_NOT_MET.value;
152: }
153:
154: /**
155: * Not Implemented
156: * @return
157: */
158: public int delete_contained_entities() {
159: return 0;
160: }
161:
162: /**
163: * @param topic_name
164: * @return
165: */
166: public DataReader lookup_datareader(String topic_name) {
167: Iterator It = Vector_DataReaders.iterator();
168: DataReader temp;
169:
170: while (It.hasNext()) {
171: temp = (DataReader) It.next();
172: if (temp.get_topicdescription().get_name().equals(
173: topic_name)) {
174: return temp;
175: }
176: }
177:
178: return null;
179: }
180:
181: /**
182: * @param readers
183: * @param sample_states
184: * @param view_states
185: * @param instance_states
186: * @return
187: */
188: public int get_datareaders(DataReaderSeqHolder readers,
189: int sample_states, int view_states, int instance_states) {
190: readers.value = (DataReader[]) getVector_DataReaders()
191: .toArray();
192:
193: return RETCODE_OK.value;
194: }
195:
196: /**
197: * @param
198: * @return
199: */
200: public void notify_datareaders() {
201: Iterator It = Vector_DataReaders.iterator();
202: DataReader temp;
203:
204: while (It.hasNext()) {
205: temp = (DataReader) It.next();
206: temp.get_listener().on_data_available(temp);
207: }
208: }
209:
210: /**
211: * @param qos
212: * @return
213: */
214: public int set_qos(SubscriberQos qos) {
215: this .qos = qos;
216:
217: return RETCODE_OK.value;
218: }
219:
220: /**
221: * @param qos
222: */
223: public void get_qos(SubscriberQosHolder qos) {
224: qos.value = this .qos;
225: }
226:
227: /**
228: * @param a_listener
229: * @param mask
230: * @return
231: */
232: public int set_listener(SubscriberListener a_listener, int mask) {
233: this .listner = a_listener;
234:
235: return RETCODE_OK.value;
236: }
237:
238: /**
239: * @return
240: */
241: public SubscriberListener get_listener() {
242: return listner;
243: }
244:
245: /**
246: * Not Implemented
247: * @return
248: */
249: public int begin_access() {
250: return 0;
251: }
252:
253: /**
254: * Not Implemented
255: * @return
256: */
257: public int end_access() {
258: return 0;
259: }
260:
261: /**
262: * @return
263: */
264: public DomainParticipant get_participant() {
265: return getDP_Parent();
266: }
267:
268: /**
269: * @param qos
270: * @return
271: */
272: public int set_default_datareader_qos(DataReaderQos qos) {
273: this .Default_DataReaderqos = qos;
274: return RETCODE_OK.value;
275: }
276:
277: /**
278: * @param qos
279: */
280: public void get_default_datareader_qos(DataReaderQosHolder qos) {
281: qos.value = this .Default_DataReaderqos;
282:
283: }
284:
285: /**
286: * @param a_datareader_qos
287: * @param a_topic_qos
288: * @return
289: */
290: public int copy_from_topic_qos(
291: DataReaderQosHolder a_datareader_qos, TopicQos a_topic_qos) {
292:
293: a_datareader_qos.value.deadline = a_topic_qos.deadline;
294: a_datareader_qos.value.destination_order = a_topic_qos.destination_order;
295: a_datareader_qos.value.durability = a_topic_qos.durability;
296: a_datareader_qos.value.history = a_topic_qos.history;
297: a_datareader_qos.value.latency_budget = a_topic_qos.latency_budget;
298: a_datareader_qos.value.liveliness = a_topic_qos.liveliness;
299: a_datareader_qos.value.reliability = a_topic_qos.reliability;
300: a_datareader_qos.value.resource_limits = a_topic_qos.resource_limits;
301:
302: return RETCODE_OK.value;
303: }
304:
305: /**
306: * Not Implemented
307: * @return
308: */
309: public int enable() {
310: return 0;
311: }
312:
313: /**
314: * Not Implemented
315: * @return
316: */
317: public StatusCondition get_statuscondition() {
318: return null;
319: }
320:
321: /**
322: * Not Implemented
323: * @return
324: */
325: public int get_status_changes() {
326: return 0;
327: }
328:
329: /**
330: * Not Implemented
331: * @return
332: */
333: public boolean isDeletable() {
334: return getVector_DataReaders().isEmpty();
335: }
336:
337: /**
338: * @return Returns the vector_DataReaders.
339: */
340: public Vector getVector_DataReaders() {
341: return Vector_DataReaders;
342: }
343:
344: /**
345: * @param arg0
346: * @return
347: */
348: public boolean add(DataReader DR) {
349: return Vector_DataReaders.add(DR);
350: }
351:
352: /**
353: * @param arg0
354: * @return
355: */
356: public boolean remove(DataReader DR) {
357: return Vector_DataReaders.remove(DR);
358: }
359:
360: /**
361: * @return Returns the dP_Parent.
362: */
363: public DomainParticipant getDP_Parent() {
364: return DP_Parent;
365: }
366:
367: /**
368: * @param orb The orb to set.
369: */
370: public void setORB(org.omg.CORBA.ORB orb) {
371: this .orb = orb;
372: }
373:
374: /**
375: * @param poa The poa to set.
376: */
377: public void setPOA(org.omg.PortableServer.POA poa) {
378: this .poa = poa;
379: }
380:
381: /**
382: * @return Returns the instance.
383: */
384: public Object getInstance() {
385: return instance;
386: }
387:
388: /**
389: * @param instance The instance to set.
390: */
391: public void setInstance(Object instance) {
392: this.instance = instance;
393:
394: }
395: }
|