001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.tools.server;
028:
029: import java.io.IOException;
030: import java.io.InputStream;
031: import java.io.ObjectInputStream;
032: import java.net.InetAddress;
033: import java.net.MalformedURLException;
034: import java.net.ServerSocket;
035: import java.net.Socket;
036: import java.net.SocketTimeoutException;
037: import java.net.URL;
038: import java.util.HashMap;
039: import java.util.Map;
040:
041: /**
042: * Client adapter (server) that listens on a port for URL
043: * connections keeps a map of (path, OutputListener) pairs.
044: * <p>
045: * Spawns multiple threads to listen on the specified port.
046: */
047: public class URLListener {
048:
049: //
050: // FIXME this implementation is "okay" for now.. it has
051: // some threading and socket-usage issues which can be
052: // addressed later.
053: //
054:
055: private final int port;
056: private final int timeoutMillis;
057: private final String urlBase;
058:
059: // map of (id, OutputListener) pairs
060: private final Map m = new HashMap();
061:
062: private boolean running;
063:
064: private ServerSocket serverSock;
065:
066: private final Object lock = new Object();
067:
068: public URLListener(int port) {
069: this (port, -1);
070: }
071:
072: public URLListener(int port, int timeoutMillis) {
073: this .port = port;
074: this .timeoutMillis = timeoutMillis;
075: if (port <= 0) {
076: throw new IllegalArgumentException("Bad port: " + port);
077: }
078: String localHostName;
079: try {
080: localHostName = InetAddress.getLocalHost().getHostName();
081: } catch (Exception e) {
082: throw new RuntimeException(
083: "Unable to get local host address: "
084: + e.getMessage());
085: }
086: this .urlBase = "http://" + localHostName + ":" + port + "/";
087: }
088:
089: /**
090: * Start the server.
091: */
092: public void start() {
093: spawnServer();
094: }
095:
096: public void stop() {
097: synchronized (lock) {
098: haltServer();
099: }
100: }
101:
102: /**
103: * Equivalent to "addListener(id,ol,false)".
104: */
105: public URL addListener(String id, OutputListener ol) {
106: return addListener(id, ol, false);
107: }
108:
109: /**
110: * Add a listener with the given name, returning the URL
111: * for RemoteListenable registration.
112: *
113: * @param override if true then if the name is already taken
114: * the other listener will be removed and the
115: * passed listener will take its place; if false then
116: * a "name already in use" exception is thrown instead.
117: *
118: * @see #start
119: */
120: public URL addListener(String id, OutputListener ol,
121: boolean override) {
122: if ((id == null) || (ol == null)) {
123: throw new NullPointerException();
124: }
125: URL ret;
126: try {
127: ret = new URL(urlBase + id);
128: } catch (MalformedURLException mue) {
129: throw new IllegalArgumentException("Illegal name format \""
130: + id + "\": " + mue.getMessage());
131: }
132: synchronized (lock) {
133: if (m.get(id) != null) {
134: if (override) {
135: // FIXME remove the other listener!
136: // do we need a map of (id, clientSocket) to close?
137: } else {
138: throw new IllegalArgumentException(
139: "Listener name \"" + id
140: + "\" is already in use");
141: }
142: }
143: m.put(id, ol);
144: }
145: return ret;
146: }
147:
148: public void removeListener(String id) {
149: if (id == null) {
150: throw new NullPointerException();
151: }
152: synchronized (lock) {
153: m.remove(id);
154: // FIXME race condition for remove/handle
155: }
156: }
157:
158: //
159: // the rest is protected / private:
160: //
161:
162: private OutputListener getOutputListener(String id) {
163: synchronized (lock) {
164: return (OutputListener) m.get(id);
165: }
166: }
167:
168: protected void scheduleServer(Runnable r) {
169: // FIXME add a thread pool (or use nio?)
170: Thread t = new Thread(r, r.toString());
171: t.start();
172: }
173:
174: protected void scheduleClient(Runnable r) {
175: // FIXME add a thread pool (or use nio?)
176: Thread t = new Thread(r, r.toString());
177: t.start();
178: }
179:
180: protected void spawnServer() {
181: // create runner
182: Runnable r = new Runnable() {
183: public void run() {
184: serve();
185: }
186:
187: public String toString() {
188: return "URL listener for server (" + port + ")";
189: }
190: };
191: scheduleServer(r);
192: }
193:
194: protected void spawnHandler(final Socket clientSock) {
195: // create runner
196: Runnable r = new Runnable() {
197: public void run() {
198: handle(clientSock);
199: }
200:
201: public String toString() {
202: return "URL socket handler for (" + port + ") client: "
203: + clientSock;
204: }
205: };
206: scheduleClient(r);
207: }
208:
209: protected void haltServer() {
210: synchronized (lock) {
211: running = false;
212: if (serverSock != null) {
213: try {
214: serverSock.close();
215: } catch (Exception e) {
216: System.err
217: .println("Unable to cleanly halt server:");
218: e.printStackTrace();
219: }
220: }
221: // wait for server?
222: }
223: }
224:
225: protected final void serve() {
226: synchronized (lock) {
227: if (running) {
228: throw new IllegalStateException(
229: "Server already running");
230: }
231: running = true;
232: }
233: try {
234: serverSock = new ServerSocket(port);
235: if (timeoutMillis > 0) {
236: serverSock.setSoTimeout(timeoutMillis);
237: }
238: while (running) {
239: final Socket clientSock;
240: try {
241: clientSock = serverSock.accept();
242: } catch (SocketTimeoutException ste) {
243: continue;
244: }
245: spawnHandler(clientSock);
246: }
247: } catch (Exception e) {
248: System.err.println("Listener (" + port + ") failure: ");
249: e.printStackTrace();
250: System.err.println("Listener (" + port + ") halting");
251: } finally {
252: haltServer();
253: }
254: }
255:
256: protected final void handle(Socket clientSock) {
257: InputStream is = null;
258: try {
259: if (timeoutMillis > 0) {
260: // FIXME do we want this timeout???
261: clientSock.setSoTimeout(timeoutMillis);
262: }
263: is = clientSock.getInputStream();
264: handleClient(is);
265: } catch (Exception e) {
266: System.err.println("Listener (" + port + ") failure: ");
267: e.printStackTrace();
268: System.err.println("Listener (" + port + ") halting");
269: haltServer();
270: } finally {
271: if (is != null) {
272: try {
273: is.close();
274: } catch (Exception e) {
275: System.err
276: .println("Unable to cleanly close client input stream:");
277: e.printStackTrace();
278: }
279: }
280: try {
281: clientSock.close();
282: } catch (Exception e) {
283: System.err
284: .println("Unable to cleanly close client socket:");
285: e.printStackTrace();
286: }
287: }
288: }
289:
290: private void handleClient(InputStream is) throws Exception {
291: // read header
292: String path = readPath(is);
293: if (path.startsWith("/")) {
294: path = path.substring(1);
295: }
296: String id = path;
297: // find listener
298: //
299: // FIXME maybe find listener per bundle, to allow
300: // for dynamic (client) substitution of listener?
301: OutputListener ol = getOutputListener(id);
302: if (ol == null) {
303: System.err.println("Unknown listener: " + id);
304: }
305: // read bundles
306: ObjectInputStream ois = null;
307: try {
308: ois = new ObjectInputStream(is);
309: while (running) {
310: OutputBundle ob = readOutputBundle(ois);
311: if (ob == null) {
312: break;
313: }
314: ol.handleOutputBundle(ob);
315: }
316: } finally {
317: if (ois != null) {
318: try {
319: ois.close();
320: } catch (Exception e) {
321: System.err
322: .println("Unable to cleanly close client object stream:");
323: e.printStackTrace();
324: }
325: }
326: }
327: }
328:
329: private String readPath(InputStream is) throws IOException {
330: // read method & ' '
331: int i;
332: while (true) {
333: i = is.read();
334: if (i == ' ') {
335: // skip whitespace
336: do {
337: i = is.read();
338: } while (i == ' ');
339: break;
340: }
341: }
342: // read path
343: StringBuffer buf = new StringBuffer(13);
344: while (true) {
345: buf.append((char) i);
346: i = is.read();
347: if (i == ' ') {
348: break;
349: }
350: }
351: String path = buf.toString();
352: // read until end of line
353: // read (optional) non-empty lines
354: // read empty line
355: //
356: // FIXME assume only one line
357: while (true) {
358: i = is.read();
359: if (i == '\r') {
360: is.read(); // \n
361: is.read(); // \r
362: is.read(); // \n
363: break;
364: }
365: }
366: return path;
367: }
368:
369: private OutputBundle readOutputBundle(ObjectInputStream ois)
370: throws IOException {
371: Object o;
372: try {
373: o = ois.readObject();
374: } catch (Exception e) {
375: e.printStackTrace();
376: throw new RuntimeException(e.getMessage());
377: }
378: if (o == null) {
379: return null;
380: }
381: if (!(o instanceof OutputBundle)) {
382: throw new IllegalArgumentException("Not an OutputBundle: "
383: + o.getClass());
384: }
385: OutputBundle ob = (OutputBundle) o;
386: return ob;
387: }
388:
389: public String toString() {
390: return "URLListener on port " + port;
391: }
392: }
|