001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.web.micro.mts;
028:
029: import java.io.InputStream;
030: import java.io.OutputStream;
031: import java.util.HashMap;
032: import java.util.List;
033: import java.util.Map;
034:
035: import org.cougaar.core.agent.service.MessageSwitchService;
036: import org.cougaar.core.mts.Message;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.mts.MessageHandler;
039: import org.cougaar.core.service.LoggingService;
040: import org.cougaar.core.service.ThreadService;
041: import org.cougaar.core.service.UIDService;
042: import org.cougaar.core.util.UID;
043: import org.cougaar.lib.web.micro.base.AnnotatedInputStream;
044: import org.cougaar.lib.web.micro.base.AnnotatedOutputStream;
045: import org.cougaar.lib.web.micro.base.ClientFactory;
046: import org.cougaar.lib.web.micro.base.Connection;
047:
048: /**
049: * A connection factory for creating client requests, such as to support
050: * our {@link MessagingServletTunnel}.
051: */
052: public class MessagingClientFactory implements ClientFactory {
053:
054: /** A {@link PipeMessage} tag for messages we should receive */
055: public static final String TYPE = "client";
056:
057: private final LoggingService log;
058: private final UIDService uids;
059: private final ThreadService threadService;
060: private final MessageSwitchService msgSwitch;
061: private final long nagle;
062:
063: // map from session UID to Pipe
064: private final Map input_pipes = new HashMap();
065:
066: // todo for processing input messages
067: private final TodoQueue todo;
068:
069: /**
070: * @param nagle see {@link OutputPipe}
071: */
072: public MessagingClientFactory(LoggingService log, UIDService uids,
073: ThreadService threadService,
074: MessageSwitchService msgSwitch, long nagle) {
075: this .log = log;
076: this .uids = uids;
077: this .threadService = threadService;
078: this .msgSwitch = msgSwitch;
079: this .nagle = nagle;
080:
081: String s = (log == null ? "log" : uids == null ? "uids"
082: : threadService == null ? "threadService"
083: : msgSwitch == null ? "msgSwitch" : null);
084: if (s != null) {
085: throw new IllegalArgumentException("null " + s);
086: }
087:
088: // this thread won't block
089: this .todo = new TodoQueue(log, threadService,
090: "mts tunnel receiver", ThreadService.BEST_EFFORT_LANE) {
091: protected void doNow(Object o) {
092: handleMessage((PipeMessage) o);
093: }
094: };
095: }
096:
097: public void start() {
098: MessageHandler handler = new MessageHandler() {
099: // called in the mts thread
100: public boolean handleMessage(Message m) {
101: if (!(m instanceof PipeMessage))
102: return false;
103: PipeMessage pm = (PipeMessage) m;
104: if (!TYPE.equals(pm.getType()))
105: return false;
106: // switch threads
107: todo.add(pm);
108: return true;
109: }
110: };
111: msgSwitch.addMessageHandler(handler);
112: }
113:
114: public void stop() {
115: // no "msgSwitch.removeMessageHandler(handler)" method!
116: }
117:
118: // called in our "todo" thread
119: private void handleMessage(PipeMessage pm) {
120: if (log.isDebugEnabled()) {
121: log.debug("client-recv: " + pm);
122: }
123:
124: // lookup pipe
125: UID sessionId = pm.getSessionId();
126:
127: List data = pm.getData();
128: Object last = (data.isEmpty() ? null : data
129: .get(data.size() - 1));
130: boolean is_last = (last == Tokens.CLOSE);
131:
132: InputPipe ip;
133: synchronized (input_pipes) {
134: ip = (InputPipe) input_pipes.get(sessionId);
135: if (is_last) {
136: input_pipes.remove(sessionId);
137: }
138: }
139: if (ip == null) {
140: // unknown? response to dead/closed pipe?
141: if (log.isWarnEnabled()) {
142: log.warn("Unknown sessionId specified by " + pm);
143: }
144: return;
145: }
146:
147: // deliver to input pipe
148: ip.deliver(pm.getCounter(), pm.getMetaData(), data);
149: }
150:
151: public Connection connect(Object o, Map metaData) {
152: if (!(o instanceof MessageAddress)) {
153: throw new IllegalArgumentException(
154: "Expecting a MessageAddress, not "
155: + (o == null ? "null" : o.getClass()
156: .getName()));
157: }
158: MessageAddress address = (MessageAddress) o;
159:
160: return makeConnection(address, metaData);
161: }
162:
163: private Connection makeConnection(final MessageAddress target,
164: final Map metaData) {
165:
166: // create pipes
167: final UID sessionId = uids.nextUID();
168: final InputPipe ip = new InputPipe();
169: synchronized (input_pipes) {
170: input_pipes.put(sessionId, ip);
171: }
172: Deliverer sender = new Deliverer() {
173: public void deliver(int counter, Map meta, List data) {
174: PipeMessage pm = new PipeMessage(msgSwitch
175: .getMessageAddress(), target,
176: MessagingServerFactory.TYPE, sessionId,
177: counter, meta, data);
178: // TODO specify message timeout equal to server's metaData read timeout
179: if (log.isDebugEnabled()) {
180: log.debug("client-send: " + pm);
181: }
182: msgSwitch.sendMessage(pm);
183: }
184: };
185: final OutputPipe op = new OutputPipe("mts tunnel target="
186: + target + " session=" + sessionId, log, threadService,
187: sender, metaData, nagle);
188:
189: return new Connection() {
190: public Map getMetaData() {
191: return null; // not applicable
192: }
193:
194: public AnnotatedInputStream getInputStream() {
195: return ip.getInputStream();
196: }
197:
198: public AnnotatedOutputStream getOutputStream() {
199: return op.getOutputStream();
200: }
201:
202: public void close() {
203: ip.close();
204: op.close();
205: synchronized (input_pipes) {
206: input_pipes.remove(sessionId);
207: }
208: }
209: };
210: }
211: }
|