001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.wp.bootstrap.multicast;
028:
029: import java.io.BufferedReader;
030: import java.io.ByteArrayInputStream;
031: import java.io.InputStream;
032: import java.io.InputStreamReader;
033: import java.io.Reader;
034: import java.net.DatagramPacket;
035: import java.net.DatagramSocket;
036: import java.net.InetAddress;
037: import java.net.MulticastSocket;
038: import java.net.URI;
039: import java.util.Collection;
040: import java.util.HashMap;
041: import java.util.Iterator;
042: import java.util.Map;
043: import java.util.regex.Matcher;
044: import java.util.regex.Pattern;
045: import org.cougaar.bootstrap.SystemProperties;
046: import org.cougaar.core.component.ServiceBroker;
047: import org.cougaar.core.component.ServiceRevokedListener;
048: import org.cougaar.core.service.LoggingService;
049: import org.cougaar.core.service.ThreadService;
050: import org.cougaar.core.service.wp.AddressEntry;
051: import org.cougaar.core.thread.Schedulable;
052: import org.cougaar.core.thread.SchedulableStatus;
053: import org.cougaar.core.wp.bootstrap.Bundle;
054: import org.cougaar.core.wp.bootstrap.ConfigService;
055: import org.cougaar.core.wp.bootstrap.DiscoveryBase;
056: import org.cougaar.core.wp.bootstrap.DiscoveryService;
057:
058: /**
059: * This component discovers bundles by sending a UDP multicast and
060: * listening for UDP replies.
061: */
062: public class MulticastDiscovery extends DiscoveryBase {
063: private ConfigService configService;
064:
065: private final ConfigService.Client configClient = new ConfigService.Client() {
066: public void add(Bundle b) {
067: addPoller(getBootEntry(b));
068: }
069:
070: public void change(Bundle b) {
071: add(b);
072: }
073:
074: public void remove(Bundle b) {
075: removePoller(getBootEntry(b));
076: }
077: };
078:
079: protected String getConfigPrefix() {
080: return "org.cougaar.core.wp.resolver.multicast.";
081: }
082:
083: public void load() {
084: super .load();
085:
086: configService = (ConfigService) sb.getService(configClient,
087: ConfigService.class, null);
088: if (configService == null) {
089: throw new RuntimeException("Unable to obtain ConfigService");
090: }
091: }
092:
093: public void unload() {
094: if (configService != null) {
095: sb.releaseService(configClient, ConfigService.class,
096: configService);
097: configService = null;
098: }
099:
100: super .unload();
101: }
102:
103: protected AddressEntry getBootEntry(Bundle b) {
104: return MulticastUtil.getBootEntry(b);
105: }
106:
107: protected Map lookup(Object bootObj) {
108: throw new InternalError("should use MulticastPoller!");
109: }
110:
111: protected Poller createPoller(Object bootObj) {
112: return new MulticastPoller(bootObj);
113: }
114:
115: private class MulticastPoller extends Poller {
116:
117: private static final int LISTEN_TIMEOUT = 60000;
118: private static final int PACKET_SIZE = 2048;
119:
120: private final AddressEntry bootEntry;
121:
122: private MulticastSocket sendSoc;
123:
124: private final Object lock = new Object();
125: private boolean pleaseStop;
126: private boolean running;
127: private String listenHost;
128: private int listenPort;
129:
130: private final Schedulable listenThread;
131:
132: public MulticastPoller(Object bootObj) {
133: super (bootObj);
134:
135: bootEntry = (AddressEntry) bootObj;
136:
137: Runnable listenRunner = new Runnable() {
138: public void run() {
139: runListener();
140: }
141: };
142: listenThread = threadService.getThread(
143: MulticastDiscovery.this , listenRunner,
144: "White pages bootstrap reply listener for "
145: + bootObj, ThreadService.WILL_BLOCK_LANE);
146: }
147:
148: public void start() {
149: ds.update(null);
150: synchronized (lock) {
151: if (running) {
152: return;
153: }
154: pleaseStop = false;
155: }
156: listenThread.start();
157: }
158:
159: public void stop() {
160: if (sendSoc != null) {
161: try {
162: sendSoc.close();
163: } catch (Exception e) {
164: if (log.isWarnEnabled()) {
165: log.warn("Unable to close " + sendSoc
166: + " socket", e);
167: }
168: }
169: }
170:
171: synchronized (lock) {
172: if (!running) {
173: return;
174: }
175: pleaseStop = true;
176: }
177: }
178:
179: public void doLookup() {
180: // no lookup in this thread, but we send our ARP here.
181: // The reply will arrive in our listenThread.
182: sendQuery();
183: }
184:
185: // our listenThread
186: private void runListener() {
187: if (log.isDebugEnabled()) {
188: log.debug("Starting listener " + bootEntry);
189: }
190:
191: // create reply socket
192: DatagramSocket listenSoc;
193: String host;
194: int port;
195: try {
196: listenSoc = new DatagramSocket();
197: if (LISTEN_TIMEOUT > 0) {
198: listenSoc.setSoTimeout(LISTEN_TIMEOUT);
199: }
200: host = listenSoc.getLocalAddress().getHostAddress();
201: port = listenSoc.getLocalPort();
202: if ("0.0.0.0".equals(host)) {
203: host = InetAddress.getLocalHost().getHostAddress();
204: }
205: } catch (Exception e) {
206: if (log.isErrorEnabled()) {
207: log.error("Unable to open listener socket", e);
208: }
209: return;
210: }
211:
212: // let our send thread see the reply-to address
213: if (log.isInfoEnabled()) {
214: log.info("Listening on " + host + ":" + port
215: + " for replies to " + bootObj);
216: }
217: synchronized (lock) {
218: running = true;
219: listenHost = host;
220: listenPort = port;
221: }
222:
223: // listen for replies
224: DatagramPacket packet = new DatagramPacket(
225: new byte[PACKET_SIZE], PACKET_SIZE);
226: while (true) {
227: synchronized (lock) {
228: if (pleaseStop) {
229: break;
230: }
231: }
232: try {
233: try {
234: SchedulableStatus.beginNetIO("UDP listen");
235: listenSoc.receive(packet);
236: } finally {
237: SchedulableStatus.endBlocking();
238: }
239: } catch (Exception e) {
240: if (log.isErrorEnabled()) {
241: log.error("Exception on receive to " + host
242: + ":" + port, e);
243: }
244: continue;
245: }
246: Map bundles = readReply(packet.getData(), 0, packet
247: .getLength());
248: if (bundles == null || bundles.isEmpty()) {
249: continue;
250: }
251: for (Iterator iter = bundles.values().iterator(); iter
252: .hasNext();) {
253: Bundle b = (Bundle) iter.next();
254: ds.add(b.getName(), b);
255: }
256: }
257:
258: // shutdown
259: try {
260: listenSoc.close();
261: } catch (Exception e) {
262: if (log.isWarnEnabled()) {
263: log.warn(
264: "Unable to close " + listenSoc + " socket",
265: e);
266: }
267: }
268: synchronized (lock) {
269: running = false;
270: pleaseStop = false;
271: listenHost = null;
272: listenPort = 0;
273: }
274:
275: if (log.isDebugEnabled()) {
276: log.debug("Stopped listener " + bootEntry);
277: }
278: }
279:
280: private Map readReply(byte[] bytes, int offset, int length) {
281: //(Cougaar-RARP from=HOST:PORT bundles=\n
282: //name=X ..\n
283: //name=Y ..\n
284: //..\n
285: //)
286:
287: if (log.isInfoEnabled()) {
288: URI uri = bootEntry.getURI();
289: log.info("Reading reply from " + uri.getHost() + ":"
290: + uri.getPort() + " "
291: + new String(bytes, offset, length));
292: }
293:
294: InputStream is = new ByteArrayInputStream(bytes, offset,
295: length);
296:
297: // parse bundles
298: Map newFound = null;
299: try {
300: BufferedReader br = new BufferedReader(
301: new InputStreamReader(is));
302: String header = br.readLine();
303: if (header == null) {
304: if (log.isErrorEnabled()) {
305: log.error("Missing header");
306: }
307: return null;
308: }
309: String sp = "^\\s*" + "\\(" + "\\s*" + "Cougaar-RARP"
310: + "\\s+" + "from=([^\\s:]+):(\\d+)" + "\\s+"
311: + "bundles=" + "\\s*" + "$";
312: Pattern p = Pattern.compile(sp);
313: Matcher m = p.matcher(header);
314: if (!m.matches()) {
315: if (log.isErrorEnabled()) {
316: log.error("Invalid wp-reply header: " + header);
317: }
318: return null;
319: }
320: String host = m.group(1);
321: String sport = m.group(2);
322: int port = Integer.parseInt(sport);
323:
324: // record this host:port?
325: if (log.isInfoEnabled()) {
326: URI uri = bootEntry.getURI();
327: log.info("Reply to " + uri.getHost() + ":"
328: + uri.getPort() + " is from " + host + ":"
329: + port);
330: }
331:
332: newFound = Bundle.decodeAll(br);
333:
334: br.close();
335: } catch (Exception e) {
336: if (log.isInfoEnabled()) {
337: log.info("Unable to parse reply", e);
338: }
339: return null;
340: }
341:
342: return newFound;
343: }
344:
345: private void sendQuery() {
346: // create socket
347: if (sendSoc == null || sendSoc.isClosed()) {
348: try {
349: sendSoc = new MulticastSocket();
350: } catch (Exception e) {
351: if (log.isInfoEnabled()) {
352: log.info("Unable to open multicast socket", e);
353: }
354: return;
355: }
356: }
357:
358: // get reply address
359: String replyHost;
360: int replyPort;
361: synchronized (lock) {
362: if (!running) {
363: return;
364: }
365: replyHost = listenHost;
366: replyPort = listenPort;
367: }
368:
369: // create message
370: String msg = "(Cougaar-ARP reply-to=" + replyHost + ":"
371: + replyPort + ")";
372:
373: URI uri = bootEntry.getURI();
374: String host = uri.getHost();
375: int port = uri.getPort();
376:
377: if (log.isInfoEnabled()) {
378: log.info("Sending " + msg + " to " + host + ":" + port);
379: }
380:
381: // send multicast
382: try {
383: byte[] b = msg.getBytes();
384: InetAddress ia = InetAddress.getByName(host);
385: DatagramPacket packet = new DatagramPacket(b, b.length,
386: ia, port);
387: sendSoc.send(packet);
388: } catch (Exception e) {
389: if (log.isInfoEnabled()) {
390: log.info("Multicast Cougaar-ARP to " + host + ":"
391: + port + " failed", e);
392: }
393: }
394:
395: // our reply will arrive in the listenThread!
396: }
397: }
398: }
|