001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.web.micro.mts;
028:
029: import java.io.ByteArrayInputStream;
030: import java.io.ByteArrayOutputStream;
031: import java.io.InputStream;
032: import java.io.IOException;
033: import java.io.OutputStream;
034: import java.io.Serializable;
035: import java.util.HashMap;
036: import java.util.List;
037: import java.util.Map;
038: import javax.servlet.Servlet;
039: import javax.servlet.ServletException;
040:
041: import org.cougaar.core.agent.service.MessageSwitchService;
042: import org.cougaar.core.mts.Message;
043: import org.cougaar.core.mts.MessageAddress;
044: import org.cougaar.core.mts.MessageHandler;
045: import org.cougaar.core.service.LoggingService;
046: import org.cougaar.core.service.ThreadService;
047: import org.cougaar.core.thread.Schedulable;
048: import org.cougaar.core.util.UID;
049: import org.cougaar.lib.web.micro.base.AnnotatedInputStream;
050: import org.cougaar.lib.web.micro.base.AnnotatedOutputStream;
051: import org.cougaar.lib.web.micro.base.Connection;
052: import org.cougaar.lib.web.micro.base.ServerFactory;
053:
054: /**
055: * A connection factory for creating server accepts, such as to support
056: * our {@link MessagingServletEngine}.
057: */
058: public class MessagingServerFactory implements ServerFactory {
059:
060: /** A {@link PipeMessage} tag for messages we should receive */
061: public static final String TYPE = "server";
062:
063: private final LoggingService log;
064: private final ThreadService threadService;
065: private final MessageSwitchService msgSwitch;
066: private final long nagle;
067:
068: private AcceptCallback callback;
069:
070: // map from session UID to Pipe
071: private final Map input_pipes = new HashMap();
072:
073: // todo for processing input messages
074: private final TodoQueue todo;
075:
076: /**
077: * @param nagle see {@link OutputPipe}
078: */
079: public MessagingServerFactory(LoggingService log,
080: ThreadService threadService,
081: MessageSwitchService msgSwitch, long nagle) {
082: this .log = log;
083: this .threadService = threadService;
084: this .msgSwitch = msgSwitch;
085: this .nagle = nagle;
086:
087: String s = (log == null ? "log"
088: : threadService == null ? "threadService"
089: : msgSwitch == null ? "msgSwitch" : null);
090: if (s != null) {
091: throw new IllegalArgumentException("null " + s);
092: }
093:
094: // this thread won't block
095: this .todo = new TodoQueue(log, threadService,
096: "mts engine receiver", ThreadService.BEST_EFFORT_LANE) {
097: protected void doNow(Object o) {
098: handleMessage((PipeMessage) o);
099: }
100: };
101: }
102:
103: public void start() {
104: MessageHandler handler = new MessageHandler() {
105: public boolean handleMessage(Message m) {
106: if (!(m instanceof PipeMessage))
107: return false;
108: PipeMessage pm = (PipeMessage) m;
109: if (!TYPE.equals(pm.getType()))
110: return false;
111: // switch threads
112: todo.add(pm);
113: return true;
114: }
115: };
116: msgSwitch.addMessageHandler(handler);
117: }
118:
119: public void stop() {
120: // no "msgSwitch.removeMessageHandler(handler)" method!
121: }
122:
123: public ListenerControl listen(Map settings, AcceptCallback cb) {
124: if (callback != null) {
125: throw new IllegalStateException("Already have a callback");
126: }
127:
128: this .callback = cb;
129: return new ListenerControl() {
130: public void stop() {
131: if (callback != null) {
132: callback = null;
133: }
134: }
135: };
136: }
137:
138: private void handleMessage(PipeMessage pm) {
139: if (log.isDebugEnabled()) {
140: log.debug("server-recv: " + pm);
141: }
142:
143: if (callback == null) {
144: // error? no listener..
145: if (log.isErrorEnabled()) {
146: log
147: .error("Unable to handle message, no callback listener!");
148: }
149: return;
150: }
151:
152: final MessageAddress target = pm.getOriginator();
153: Map metaData = pm.getMetaData();
154: final UID sessionId = pm.getSessionId();
155: int counter = pm.getCounter();
156:
157: List data = pm.getData();
158: Object last = (data.isEmpty() ? null : data
159: .get(data.size() - 1));
160: boolean is_last = (last == Tokens.CLOSE);
161:
162: // create pipe or look it up if this is an existing connection
163: final InputPipe ip;
164: synchronized (input_pipes) {
165: if (counter <= 0) {
166: ip = new InputPipe();
167: if (!is_last) {
168: input_pipes.put(sessionId, ip);
169: }
170: } else {
171: ip = (InputPipe) input_pipes.get(sessionId);
172: if (is_last) {
173: input_pipes.remove(sessionId);
174: }
175: }
176: }
177: if (ip == null) {
178: if (log.isWarnEnabled()) {
179: log.warn("Unknown sessionId specified by " + pm);
180: }
181: return;
182: }
183:
184: if (log.isDebugEnabled()) {
185: log.debug("ip<" + sessionId + ">.deliver(" + counter
186: + ", ..");
187: }
188: ip.deliver(counter, metaData, data);
189: if (counter > 0) {
190: // just an input update, don't invoke new "accept" callback
191: return;
192: }
193:
194: if (log.isInfoEnabled()) {
195: log.info("Handling servlet request from " + target + ": "
196: + pm);
197: }
198:
199: Deliverer sender = new Deliverer() {
200: public void deliver(int seq, Map meta, List dat) {
201: PipeMessage pm = new PipeMessage(msgSwitch
202: .getMessageAddress(), target,
203: MessagingClientFactory.TYPE, sessionId, seq,
204: meta, dat);
205: // TODO specify message timeout equal to client's metaData read timeout
206: if (log.isDebugEnabled()) {
207: log.debug("server-send: " + pm);
208: }
209: msgSwitch.sendMessage(pm);
210: }
211: };
212: final OutputPipe op = new OutputPipe(
213: "mts engine output pipe target=" + target + " session="
214: + sessionId, log, threadService, sender, null,
215: nagle);
216:
217: final Connection con = new Connection() {
218: public Map getMetaData() {
219: return ip.getMetaData();
220: }
221:
222: public AnnotatedInputStream getInputStream()
223: throws IOException {
224: return ip.getInputStream();
225: }
226:
227: public AnnotatedOutputStream getOutputStream()
228: throws IOException {
229: return op.getOutputStream();
230: }
231:
232: public void close() throws IOException {
233: ip.close();
234: op.close();
235: synchronized (input_pipes) {
236: input_pipes.remove(sessionId);
237: }
238: }
239: };
240:
241: // run our "accept" callback in a separate thread, since servlets can block
242: //
243: // we want servlets to run in parallel threads
244: Runnable r = new Runnable() {
245: public void run() {
246: try {
247: if (log.isDebugEnabled()) {
248: log.debug("accept con<" + sessionId + ">");
249: }
250: callback.accept(con);
251: } catch (Exception e) {
252: if (log.isErrorEnabled()) {
253: log.error("Accept failed for sessionId="
254: + sessionId, e);
255: }
256: }
257: }
258: };
259: Schedulable thread = threadService.getThread(this , r,
260: "mts engine servlet runner target=" + target
261: + " session=" + sessionId,
262: ThreadService.WILL_BLOCK_LANE);
263: thread.start();
264: }
265: }
|