001: /*
002: * DDS (Data Distribution Service) for JacORB
003: *
004: * Copyright (C) 2005 , Ahmed yehdih <ahmed.yehdih@gmail.com>, fouad
005: * allaoui <fouad.allaoui@gmail.com>, Didier Donsez (didier.donsez@ieee.org)
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU Library General Public License
009: * as published by the Free Software Foundation; either version 2
010: * of the License, or (at your option) any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Library General Public License for more details.
016: *
017: * You should have received a copy of the GNU Library General Public
018: * License along with this program; if not, write to the Free Software
019: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
020: * 02111-1307, USA.
021: *
022: * Coontact: Ahmed yehdih <ahmed.yehdih@gmail.com>, fouad allaoui
023: * <fouad.allaoui@gmail.com>, Didier Donsez (didier.donsez@ieee.org)
024: * Contributor(s)
025: *
026: **/
027: package demo.dds.dcps.temperaturesample;
028:
029: import org.omg.CORBA.ORB;
030: import org.omg.CosNaming.NamingContextExt;
031: import org.omg.CosNaming.NamingContextExtHelper;
032: import org.omg.PortableServer.POA;
033: import org.omg.PortableServer.POAHelper;
034: import org.omg.dds.DataReader;
035: import org.omg.dds.DataReaderListener;
036: import org.omg.dds.DataReaderListenerHelper;
037: import org.omg.dds.DataReaderQos;
038: import org.omg.dds.DataWriter;
039: import org.omg.dds.DataWriterQos;
040: import org.omg.dds.DeadlineQosPolicy;
041: import org.omg.dds.DestinationOrderQosPolicyKind;
042: import org.omg.dds.DomainParticipant;
043: import org.omg.dds.DomainParticipantFactory;
044: import org.omg.dds.DomainParticipantFactoryHelper;
045: import org.omg.dds.DomainParticipantQos;
046: import org.omg.dds.DurabilityQosPolicy;
047: import org.omg.dds.DurabilityQosPolicyKind;
048: import org.omg.dds.Duration_t;
049: import org.omg.dds.EntityFactoryQosPolicy;
050: import org.omg.dds.GroupDataQosPolicy;
051: import org.omg.dds.LifespanQosPolicy;
052: import org.omg.dds.LivelinessQosPolicyKind;
053: import org.omg.dds.OwnershipStrengthQosPolicy;
054: import org.omg.dds.PartitionQosPolicy;
055: import org.omg.dds.Publisher;
056: import org.omg.dds.PublisherQos;
057: import org.omg.dds.ReliabilityQosPolicyKind;
058: import org.omg.dds.Subscriber;
059: import org.omg.dds.SubscriberQos;
060: import org.omg.dds.TopicDataQosPolicy;
061: import org.omg.dds.TopicQos;
062: import org.omg.dds.TransportPriorityQosPolicy;
063: import org.omg.dds.UserDataQosPolicy;
064:
065: public class TemperatureProducer implements Runnable {
066:
067: private String[] args;
068:
069: public static void main(String[] args) {
070:
071: TemperatureProducer tempProducer = new TemperatureProducer();
072: tempProducer.setArgs(args);
073: new Thread(tempProducer).start();
074: }
075:
076: /**
077: *
078: */
079: public void run() {
080:
081: try {
082: // create and initialize the ORB
083:
084: ORB orb = ORB.init(args, null);
085: POA poa = POAHelper.narrow(orb
086: .resolve_initial_references("RootPOA"));
087: poa.the_POAManager().activate();
088:
089: DomainParticipantFactory domainpartiFactory;
090: DomainParticipant domainparticipant;
091: TemperatureDataWriter temperatureDW;
092: org.omg.dds.Topic topic;
093: Publisher publisher;
094: DataWriter datawriter;
095: PublisherQos publisherqos;
096: DataWriterQos datawriterqos;
097: Subscriber suscriber;
098: DataReader datareader;
099: org.omg.dds.Topic topicMessage;
100: SubscriberQos suscriberqos;
101: DataReaderQos datareaderqos;
102:
103: org.omg.CORBA.Object objRef = orb
104: .resolve_initial_references("NameService");
105: // Use NamingContextExt which is part of the Interoperable
106: // Naming Service (INS) specification.
107: NamingContextExt ncRef = NamingContextExtHelper
108: .narrow(objRef);
109: // resolve the Object Reference in Naming
110: String rname = "DomainParticipantFactory";
111: byte tab[] = new byte[1];
112: tab[0] = 1;
113: org.omg.dds.UserDataQosPolicy UDQP = new UserDataQosPolicy(
114: tab);
115: DomainParticipantQos DPQOS = new DomainParticipantQos(UDQP,
116: new EntityFactoryQosPolicy());
117: domainpartiFactory = DomainParticipantFactoryHelper
118: .narrow(ncRef.resolve_str(rname));
119: TopicQos tq = new TopicQos(new TopicDataQosPolicy(tab),
120: new DurabilityQosPolicy(DurabilityQosPolicyKind
121: .from_int(0), new Duration_t(0, 0)),
122: new DeadlineQosPolicy(new Duration_t(0, 0)),
123: new org.omg.dds.LatencyBudgetQosPolicy(
124: new Duration_t(0, 0)),
125: new org.omg.dds.LivelinessQosPolicy(
126: LivelinessQosPolicyKind.from_int(0),
127: new Duration_t(0, 0)),
128: new org.omg.dds.ReliabilityQosPolicy(
129: ReliabilityQosPolicyKind.from_int(0),
130: new Duration_t(0, 0)),
131: new org.omg.dds.DestinationOrderQosPolicy(
132: DestinationOrderQosPolicyKind.from_int(0)),
133: new org.omg.dds.HistoryQosPolicy(
134: org.omg.dds.HistoryQosPolicyKind
135: .from_int(0), 0),
136: new org.omg.dds.ResourceLimitsQosPolicy(0, 0, 0),
137: new org.omg.dds.TransportPriorityQosPolicy(0),
138: new org.omg.dds.LifespanQosPolicy(new Duration_t(0,
139: 0)), new org.omg.dds.OwnershipQosPolicy(
140: org.omg.dds.OwnershipQosPolicyKind
141: .from_int(0)));
142: domainparticipant = domainpartiFactory.create_participant(
143: 0, DPQOS, null);
144: topic = domainparticipant.create_topic("tamperature",
145: "demo.dds.dcps.temperaturesample.Temperature", tq,
146: null);
147: topicMessage = domainparticipant
148: .create_topic("messsage",
149: "demo.dds.dcps.temperaturesample.Message",
150: tq, null);
151: String st[] = new String[1];
152: st[0] = "";
153: publisherqos = new PublisherQos(
154: new org.omg.dds.PresentationQosPolicy(
155: org.omg.dds.PresentationQosPolicyAccessScopeKind
156: .from_int(0), false, false),
157: new PartitionQosPolicy(st), new GroupDataQosPolicy(
158: tab),
159: new org.omg.dds.EntityFactoryQosPolicy(false));
160: publisher = domainparticipant.create_publisher(
161: publisherqos, null);
162: datawriterqos = new DataWriterQos(new DurabilityQosPolicy(
163: DurabilityQosPolicyKind.from_int(0),
164: new Duration_t(0, 0)), new DeadlineQosPolicy(
165: new Duration_t(0, 0)),
166: new org.omg.dds.LatencyBudgetQosPolicy(
167: new Duration_t(0, 0)),
168: new org.omg.dds.LivelinessQosPolicy(
169: LivelinessQosPolicyKind.from_int(0),
170: new Duration_t(0, 0)),
171: new org.omg.dds.ReliabilityQosPolicy(
172: ReliabilityQosPolicyKind.from_int(0),
173: new Duration_t(0, 0)),
174: new org.omg.dds.DestinationOrderQosPolicy(
175: DestinationOrderQosPolicyKind.from_int(0)),
176: new org.omg.dds.HistoryQosPolicy(
177: org.omg.dds.HistoryQosPolicyKind
178: .from_int(0), 0),
179: new org.omg.dds.ResourceLimitsQosPolicy(0, 0, 0),
180: new TransportPriorityQosPolicy(0),
181: new LifespanQosPolicy(new Duration_t(0, 0)),
182: new org.omg.dds.UserDataQosPolicy(tab),
183: new OwnershipStrengthQosPolicy(0),
184: new org.omg.dds.WriterDataLifecycleQosPolicy(true));
185:
186: datawriter = publisher.create_datawriter(topic,
187: datawriterqos, null);
188: temperatureDW = TemperatureDataWriterHelper
189: .narrow(datawriter);
190: // a producer suscribe for topic message
191: suscriberqos = new SubscriberQos(
192: new org.omg.dds.PresentationQosPolicy(
193: org.omg.dds.PresentationQosPolicyAccessScopeKind
194: .from_int(0), false, false),
195: new PartitionQosPolicy(st), new GroupDataQosPolicy(
196: tab),
197: new org.omg.dds.EntityFactoryQosPolicy(false));
198: suscriber = domainparticipant.create_subscriber(
199: suscriberqos, null);
200: datareaderqos = new DataReaderQos(new DurabilityQosPolicy(
201: DurabilityQosPolicyKind.from_int(0),
202: new Duration_t(0, 0)), new DeadlineQosPolicy(
203: new Duration_t(0, 0)),
204: new org.omg.dds.LatencyBudgetQosPolicy(
205: new Duration_t(0, 0)),
206: new org.omg.dds.LivelinessQosPolicy(
207: LivelinessQosPolicyKind.from_int(0),
208: new Duration_t(0, 0)),
209: new org.omg.dds.ReliabilityQosPolicy(
210: ReliabilityQosPolicyKind.from_int(0),
211: new Duration_t(0, 0)),
212: new org.omg.dds.DestinationOrderQosPolicy(
213: DestinationOrderQosPolicyKind.from_int(0)),
214: new org.omg.dds.HistoryQosPolicy(
215: org.omg.dds.HistoryQosPolicyKind
216: .from_int(0), 0),
217: new org.omg.dds.ResourceLimitsQosPolicy(0, 0, 0),
218: new org.omg.dds.UserDataQosPolicy(tab),
219: new org.omg.dds.TimeBasedFilterQosPolicy(
220: new Duration_t(0, 0)),
221: new org.omg.dds.ReaderDataLifecycleQosPolicy(
222: new Duration_t(0, 0)));
223: datareader = suscriber.create_datareader(topicMessage,
224: datareaderqos, null);
225: MessageDataReader messagedatareader = MessageDataReaderHelper
226: .narrow(datareader);
227: DataReaderListener listener = DataReaderListenerHelper
228: .narrow(poa
229: .servant_to_reference(new MessageDataReaderListenerImpl()));
230: messagedatareader.set_listener(listener, 0);
231: double value = 0;
232: final double MAX = 40;
233: final double MIN = -15;
234: boolean direction = true;
235: Temperature temperature;
236: double random;
237: while (true) {
238: temperature = new Temperature(value, 0, Unit.Celsius,
239: (int) System.currentTimeMillis());
240: temperatureDW.write(temperature, 0);
241: random = Math.random() * 5;
242: if (direction) {
243: if (value >= MAX)
244: direction = false;
245: value += random;
246: } else {
247: if (value <= MIN) {
248: direction = true;
249: }
250: value -= random;
251: ;
252: }
253: Thread.currentThread().sleep(500);
254:
255: }
256: } catch (Exception e) {
257: System.out.println(" ERROR : " + e);
258: e.printStackTrace();
259: }
260: }
261:
262: /**
263: * @param args The args to set.
264: */
265: public void setArgs(String[] args) {
266: this .args = args;
267: }
268:
269: public void end() {
270: Thread.currentThread().destroy();
271: }
272: }
|