001: /*
002: * <copyright>
003: *
004: * Copyright 2003-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.lib.aggagent.servlet;
027:
028: import java.io.IOException;
029: import java.io.PrintWriter;
030: import java.util.HashMap;
031: import java.util.Map;
032:
033: import javax.servlet.http.HttpServlet;
034: import javax.servlet.http.HttpServletRequest;
035: import javax.servlet.http.HttpServletResponse;
036:
037: import org.cougaar.core.blackboard.IncrementalSubscription;
038: import org.cougaar.core.blackboard.Subscription;
039: import org.cougaar.core.service.BlackboardService;
040: import org.cougaar.lib.aggagent.session.IncrementFormat;
041: import org.cougaar.lib.aggagent.session.RemoteSession;
042: import org.cougaar.lib.aggagent.session.SubscriptionAccess;
043: import org.cougaar.lib.aggagent.session.SubscriptionWrapper;
044: import org.cougaar.lib.aggagent.session.XmlIncrement;
045: import org.cougaar.lib.aggagent.util.Const;
046: import org.cougaar.util.UnaryPredicate;
047:
048: /**
049: * This keep-alive component is used for monitoring the aggregation agent's
050: * blackboard by way of incremental updates passed over a keep alive
051: * connection.
052: */
053: public class AggregationKeepAliveComponent extends
054: BlackboardServletComponent {
055: private Map sessionMap = new HashMap();
056: private int sessionCounter = 0;
057:
058: /**
059: * Constructor.
060: */
061: public AggregationKeepAliveComponent() {
062: super ();
063: myServlet = new AggregationKeepAliveServlet();
064: }
065:
066: /**
067: * Here is our inner class that will handle all HTTP and
068: * HTTPS service requests for our <tt>myPath</tt>.
069: */
070: private class AggregationKeepAliveServlet extends HttpServlet {
071: /**
072: * Run a keep-alive session. This method will continue to send lines of
073: * output to the provided PrintStream until an error is detected.
074: */
075: public void doPut(HttpServletRequest request,
076: HttpServletResponse response) throws IOException {
077: if (log.isDebugEnabled())
078: log.debug("doPut");
079:
080: // check if this is a cancel request
081: String cancelSessionId = request
082: .getParameter("CANCEL_SESSION_ID");
083: if (cancelSessionId != null) {
084: synchronized (sessionMap) {
085: sessionMap.put(cancelSessionId, Boolean.TRUE);
086: }
087: return; // done canceling session
088: }
089:
090: //
091: // Handle Keep Alive Session Request
092: //
093: PrintWriter out = new PrintWriter(response
094: .getOutputStream());
095: KeepAliveSession kaSession = null;
096:
097: // establish session id, send to client
098: String this Session;
099: synchronized (sessionMap) {
100: this Session = String.valueOf(sessionCounter++);
101: sessionMap.put(this Session, new Boolean(false));
102: }
103: synchronized (out) {
104: out.println("<session_created id=\"" + this Session
105: + "\" />");
106: endMessage(out);
107: }
108:
109: try {
110: AggregationXMLInterface.MonitorRequestParser monitorRequest = new AggregationXMLInterface.MonitorRequestParser(
111: request);
112: kaSession = new KeepAliveSession(agentId.toString(),
113: blackboard, createSubscriptionSupport(),
114: monitorRequest.unaryPredicate,
115: new XmlIncrement(monitorRequest.xmlEncoder),
116: out);
117:
118: boolean outputError = false;
119: boolean sessionCanceled = false;
120:
121: while ((!outputError) && (!sessionCanceled)) {
122: if (log.isDebugEnabled())
123: log.debug("---------Keep Alive Session "
124: + this Session + " is Alive---------");
125:
126: Thread.sleep(5000);
127:
128: // Without an ack message, a keep alive connection over which no
129: // updates are sent will never die
130: // (even without a client on the other end).
131: synchronized (out) {
132: out.print(Const.KEEP_ALIVE_ACK_MESSAGE);
133: endMessage(out);
134: outputError = out.checkError();
135: }
136:
137: synchronized (sessionMap) {
138: sessionCanceled = ((Boolean) sessionMap
139: .get(this Session)).booleanValue();
140: }
141: }
142: } catch (Exception done_in) {
143: if (log.isDebugEnabled())
144: log.debug("doPut: aborted!");
145: } finally {
146: kaSession.cancel();
147: synchronized (sessionMap) {
148: sessionMap.remove(this Session);
149: }
150: if (log.isDebugEnabled())
151: log.debug("doPut: leaving");
152: }
153: }
154: }
155:
156: private class KeepAliveSession extends RemoteSession implements
157: SubscriptionListener {
158: PrintWriter out = null;
159: Subscription rawData = null;
160: SubscriptionAccess data = null;
161: SubscriptionMonitorSupport sms = null;
162:
163: KeepAliveSession(String agentId, BlackboardService blackboard,
164: SubscriptionMonitorSupport sms,
165: UnaryPredicate predicate, IncrementFormat format,
166: PrintWriter out) {
167: super ("", "", format);
168: setAgentId(agentId);
169: this .out = out;
170: this .sms = sms;
171:
172: // This is a separate transaction from the one that calls
173: // subscriptionChanged. No additional synchronization is necessary.
174: try {
175: blackboard.openTransaction();
176: rawData = blackboard.subscribe(predicate, true);
177: sms.setSubscriptionListener(rawData, this );
178: data = new SubscriptionWrapper(
179: (IncrementalSubscription) rawData);
180: } catch (Exception e) {
181: e.printStackTrace();
182: } finally {
183: blackboard.closeTransactionDontReset();
184: }
185: }
186:
187: public SubscriptionAccess getData() {
188: return data;
189: }
190:
191: /**
192: * This method is called by the plugin component's execute() whenever new
193: * subscription information is available.
194: *
195: * (since it is called from BlackboardServletComponent.execute() method;
196: * it is in a separate blackboard transaction from the constructor and
197: * cancel method)
198: */
199: public void subscriptionChanged(Subscription sub) {
200: if (out != null) {
201: synchronized (out) {
202: sendUpdate(out);
203: endMessage(out);
204: }
205: }
206: }
207:
208: /**
209: * Send an update of recent changes to the resident Subscription
210: * through the provided OutputStream. An IncrementFormat instance
211: * is used to encode the data being sent.
212: *
213: * This is called from the execute() method, don't open a transaction.
214: */
215: public void sendUpdate(PrintWriter out) {
216: out.println(createUpdateDelta().toXml());
217: out.flush();
218: }
219:
220: public void cancel() {
221: sms.removeSubscriptionListener(rawData);
222: try {
223: blackboard.openTransaction();
224: blackboard.unsubscribe(rawData);
225: } catch (Exception e) {
226: e.printStackTrace();
227: } finally {
228: blackboard.closeTransactionDontReset();
229: }
230: }
231: }
232:
233: private static void endMessage(PrintWriter out) {
234: out.print('\f');
235: out.flush();
236: }
237: }
|