001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.wp.bootstrap.multicast;
028:
029: import java.io.BufferedReader;
030: import java.io.ByteArrayInputStream;
031: import java.io.InputStream;
032: import java.io.InputStreamReader;
033: import java.io.Reader;
034: import java.net.DatagramPacket;
035: import java.net.DatagramSocket;
036: import java.net.InetAddress;
037: import java.net.MulticastSocket;
038: import java.net.SocketTimeoutException;
039: import java.net.URI;
040: import java.util.Collection;
041: import java.util.Iterator;
042: import java.util.Map;
043: import java.util.regex.Matcher;
044: import java.util.regex.Pattern;
045: import org.cougaar.core.component.ServiceBroker;
046: import org.cougaar.core.component.ServiceRevokedListener;
047: import org.cougaar.core.service.LoggingService;
048: import org.cougaar.core.service.ThreadService;
049: import org.cougaar.core.service.wp.AddressEntry;
050: import org.cougaar.core.thread.Schedulable;
051: import org.cougaar.core.thread.SchedulableStatus;
052: import org.cougaar.core.wp.bootstrap.AdvertiseBase;
053: import org.cougaar.core.wp.bootstrap.Bundle;
054: import org.cougaar.core.wp.bootstrap.ConfigService;
055:
056: /**
057: * This component advertises bundles by listening for UDP multicast
058: * requests and replying by UDP.
059: * <p>
060: * It looks in the {@link ConfigService} for config entries of type
061: * "-MULTICAST_REG" and scheme "multicast", or "-MCAST_REG" and
062: * "mcast", e.g.<pre>
063: * X={-MULTICAST_REG=multicast://224.22.165.34:7777}
064: * </pre>
065: * and creates a UDP multicast listener for that group:port and
066: * replies to<pre>
067: * (Cougaar-ARP reply-to=HOST:PORT)
068: * </pre>
069: * requests with text-encoded bundles tracked by the {@link
070: * org.cougaar.core.wp.bootstrap.AdvertiseService} (i.e. locally bound
071: * leases), e.g.<pre>
072: * (Cougaar-RARP from=test.com:9876 bundles:\
073: * A={-RMI=rmi://host:port/objId,\
074: * }\
075: * #)
076: * </pre>
077: * <p>
078: * Note that there's currently no filtering of the local leases.
079: */
080: public class MulticastAdvertise extends AdvertiseBase {
081:
082: private ConfigService configService;
083:
084: private final ConfigService.Client configClient = new ConfigService.Client() {
085: public void add(Bundle b) {
086: addAdvertiser(getBootEntry(b));
087: }
088:
089: public void change(Bundle b) {
090: add(b);
091: }
092:
093: public void remove(Bundle b) {
094: removeAdvertiser(getBootEntry(b));
095: }
096: };
097:
098: public void load() {
099: super .load();
100:
101: configService = (ConfigService) sb.getService(configClient,
102: ConfigService.class, null);
103: if (configService == null) {
104: throw new RuntimeException("Unable to obtain ConfigService");
105: }
106: }
107:
108: public void unload() {
109: if (configService != null) {
110: sb.releaseService(configClient, ConfigService.class,
111: configService);
112: configService = null;
113: }
114:
115: super .unload();
116: }
117:
118: protected AddressEntry getBootEntry(Bundle b) {
119: AddressEntry entry = MulticastUtil.getBootEntry(b);
120: if (entry == null) {
121: return null;
122: }
123: // assume we're it?
124: return entry;
125: }
126:
127: protected Advertiser createAdvertiser(Object bootObj) {
128: return new MulticastAdvertiser(bootObj);
129: }
130:
131: private class MulticastAdvertiser extends Advertiser implements
132: Runnable {
133:
134: private static final int LISTEN_TIMEOUT = 60000;
135: private static final int PACKET_SIZE = 512;
136:
137: private final AddressEntry bootEntry;
138:
139: private final Schedulable listenThread;
140:
141: private final Object lock = new Object();
142: private boolean pleaseStop;
143: private boolean running;
144:
145: public MulticastAdvertiser(Object bootObj) {
146: super (bootObj);
147:
148: bootEntry = (AddressEntry) bootObj;
149:
150: Runnable listenRunner = new Runnable() {
151: public void run() {
152: runListener();
153: }
154: };
155: listenThread = threadService.getThread(
156: MulticastAdvertise.this , listenRunner,
157: "White pages bootstrap query listener for "
158: + bootObj, ThreadService.WILL_BLOCK_LANE);
159: }
160:
161: public void start() {
162: synchronized (lock) {
163: if (running) {
164: return;
165: }
166: pleaseStop = false;
167: }
168: listenThread.start();
169: }
170:
171: public void update(String name, Bundle bundle) {
172: // do nothing, since we server bundles (as opposed to posting
173: // them at external server)
174: }
175:
176: public void stop() {
177: synchronized (lock) {
178: if (!running) {
179: return;
180: }
181: pleaseStop = true;
182: }
183: }
184:
185: // our listenThread
186: private void runListener() {
187: if (log.isDebugEnabled()) {
188: log.debug("Starting listener " + bootEntry);
189: }
190:
191: URI uri = bootEntry.getURI();
192: String listenHost = uri.getHost();
193: int listenPort = uri.getPort();
194:
195: InetAddress listenAddr;
196: try {
197: listenAddr = InetAddress.getByName(listenHost);
198: } catch (Exception e) {
199: if (log.isErrorEnabled()) {
200: log.error(
201: "Unable to getByName(" + listenHost + ")",
202: e);
203: }
204: return;
205: }
206:
207: // create multicast listener socket
208: MulticastSocket listenSoc;
209: try {
210: listenSoc = new MulticastSocket(listenPort);
211: if (LISTEN_TIMEOUT > 0) {
212: listenSoc.setSoTimeout(LISTEN_TIMEOUT);
213: }
214: listenSoc.joinGroup(listenAddr);
215: } catch (Exception e) {
216: if (log.isErrorEnabled()) {
217: log.error("Unable to create multicast socket on "
218: + uri, e);
219: }
220: return;
221: }
222:
223: // create datagram reply socket
224: DatagramSocket replySoc;
225: InetAddress replyAddr;
226: int replyPort;
227: try {
228: replySoc = new DatagramSocket();
229: replyAddr = replySoc.getLocalAddress();
230: replyPort = replySoc.getLocalPort();
231: } catch (Exception e) {
232: if (log.isErrorEnabled()) {
233: log.error("Unable to create reply socket", e);
234: }
235: try {
236: listenSoc.leaveGroup(listenAddr);
237: listenSoc.close();
238: } catch (Exception e2) {
239: if (log.isWarnEnabled()) {
240: log.warn("Unable to close " + listenSoc
241: + " socket", e2);
242: }
243: }
244: return;
245: }
246:
247: // let our send thread see the reply-to address
248: if (log.isInfoEnabled()) {
249: log.info("Listening on " + listenHost + ":"
250: + listenPort
251: + " for multicast queries, will reply on "
252: + replyAddr + ":" + replyPort);
253: }
254: synchronized (lock) {
255: running = true;
256: }
257:
258: // listen for queries
259: DatagramPacket packet = new DatagramPacket(
260: new byte[PACKET_SIZE], PACKET_SIZE);
261: while (true) {
262: synchronized (lock) {
263: if (pleaseStop) {
264: break;
265: }
266: }
267: try {
268: try {
269: SchedulableStatus
270: .beginNetIO("Multicast listen");
271: listenSoc.receive(packet);
272: } finally {
273: SchedulableStatus.endBlocking();
274: }
275: String replyTo = readQuery(packet.getData(), 0,
276: packet.getLength());
277: if (replyTo == null) {
278: continue;
279: }
280: sendReply(replySoc, replyTo);
281: } catch (SocketTimeoutException ste) {
282: if (log.isDebugEnabled()) {
283: log
284: .debug("Ignore socket timeout, keep listening...");
285: }
286: } catch (Exception e) {
287: if (log.isErrorEnabled()) {
288: log.error("Failed query receive", e);
289: }
290: }
291: }
292:
293: // shutdown
294: try {
295: listenSoc.leaveGroup(listenAddr);
296: listenSoc.close();
297: } catch (Exception e) {
298: if (log.isWarnEnabled()) {
299: log.warn(
300: "Unable to close " + listenSoc + " socket",
301: e);
302: }
303: }
304: synchronized (lock) {
305: running = false;
306: pleaseStop = false;
307: }
308:
309: if (log.isDebugEnabled()) {
310: log.debug("Stopped listener " + bootEntry);
311: }
312: }
313:
314: private String readQuery(byte[] bytes, int offset, int length) {
315: //(Cougaar-ARP reply-to=HOST:PORT)
316:
317: if (log.isInfoEnabled()) {
318: log.info("Reading query to " + bootEntry.getURI()
319: + ": " + new String(bytes, offset, length));
320: }
321:
322: InputStream is = new ByteArrayInputStream(bytes, offset,
323: length);
324:
325: // parse query
326: try {
327: BufferedReader br = new BufferedReader(
328: new InputStreamReader(is));
329: String header = br.readLine();
330: if (header == null) {
331: if (log.isErrorEnabled()) {
332: log.error("Missing header");
333: }
334: return null;
335: }
336: String sp = "^\\s*" + "\\(" + "\\s*" + "Cougaar-ARP"
337: + "\\s+" + "reply-to=([^\\s:]+):(\\d+)"
338: + "\\s*" + "\\)" + "\\s*" + "$";
339: Pattern p = Pattern.compile(sp);
340: Matcher m = p.matcher(header);
341: if (!m.matches()) {
342: if (log.isErrorEnabled()) {
343: log.error("Invalid wp-query header: " + header);
344: }
345: return null;
346: }
347: String host = m.group(1);
348: String sport = m.group(2);
349: int port = Integer.parseInt(sport);
350:
351: return host + ":" + port;
352: } catch (Exception e) {
353: if (log.isInfoEnabled()) {
354: log.info("readQuery failed", e);
355: }
356: }
357:
358: return null;
359: }
360:
361: private void sendReply(DatagramSocket replySoc, String replyTo) {
362: int sep = replyTo.indexOf(':');
363: String host = replyTo.substring(0, sep);
364: String sport = replyTo.substring(sep + 1);
365: int port = Integer.parseInt(sport);
366:
367: if (host.equals("0.0.0.0")) {
368: if (log.isInfoEnabled()) {
369: log.info("Ignoring request from invalid reply-to="
370: + host + ":" + port);
371: }
372: return;
373: }
374: // assume that the return address is valid!
375:
376: String msg = getReply();
377:
378: if (log.isInfoEnabled()) {
379: log.info("Sending reply to " + replyTo + ": " + msg);
380: }
381:
382: try {
383: byte[] b = msg.getBytes();
384: InetAddress ia = InetAddress.getByName(host);
385: DatagramPacket packet = new DatagramPacket(b, b.length,
386: ia, port);
387: replySoc.send(packet);
388: } catch (Exception e) {
389: if (log.isErrorEnabled()) {
390: log.error("Failed reply to " + replyTo, e);
391: }
392: }
393: }
394:
395: private boolean validateReplyTo(String host, int port) {
396: boolean ret = true;
397:
398: // check for dummy address
399: if (ret && host.equals("0.0.0.0")) {
400: ret = false;
401: }
402:
403: // assume that the return address is valid!
404:
405: if (!ret) {
406: if (log.isInfoEnabled()) {
407: log.info("Ignoring request from invalid reply-to="
408: + host + ":" + port);
409: }
410: }
411:
412: return ret;
413: }
414:
415: private String getReply() {
416: Map bundles = getBundles();
417:
418: StringBuffer buf = new StringBuffer();
419: buf.append("(Cougaar-RARP from=");
420: URI uri = bootEntry.getURI();
421: buf.append(uri.getHost());
422: buf.append(":");
423: buf.append(uri.getPort());
424: buf.append(" bundles=\n");
425: if (bundles != null) {
426: for (Iterator iter = bundles.values().iterator(); iter
427: .hasNext();) {
428: Bundle b = (Bundle) iter.next();
429: String s = b.encode();
430: if (s == null) {
431: continue;
432: }
433: buf.append(s).append("\n");
434: }
435: }
436: buf.append("#)");
437: String msg = buf.toString();
438: return msg;
439: }
440:
441: private Map getBundles() {
442: // serve remote caller
443: Map ret = MulticastAdvertise.this .getBundles();
444: // filter?
445: if (log.isDebugEnabled()) {
446: log.debug("Serving bundles: " + ret);
447: }
448: return ret;
449: }
450:
451: }
452: }
|