001: // ========================================================================
002: // Copyright 2006 Mort Bay Consulting Pty. Ltd.
003: // ------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: //========================================================================
014:
015: package org.mortbay.cometd;
016:
017: import java.io.IOException;
018: import java.lang.reflect.Array;
019: import java.util.HashMap;
020: import java.util.List;
021: import java.util.Map;
022:
023: import javax.servlet.ServletException;
024: import javax.servlet.http.Cookie;
025: import javax.servlet.http.HttpServlet;
026: import javax.servlet.http.HttpServletRequest;
027: import javax.servlet.http.HttpServletResponse;
028:
029: import org.mortbay.util.TypeUtil;
030: import org.mortbay.util.ajax.Continuation;
031: import org.mortbay.util.ajax.ContinuationSupport;
032:
033: /** Cometd Filter
034: * Servlet implementing the {@link Bayeux} protocol.
035: *
036: * The Servlet can be initialized with a json file mapping channels to {@link DataFilter} definitions.
037: * The servlet init parameter "filters" should point to a webapplication resource containing a JSON
038: * array of filter definitions. For example: <pre>
039: * [
040: * {
041: * "channels": "/**",
042: * "class" : "org.mortbay.cometd.filter.NoMarkupFilter",
043: * "init" : {}
044: * }
045: * ]
046: * </pre>
047: *
048: * The init parameter "timeout" specifies the poll timeout in milliseconds (default 45000).
049: * The init parameter "multiTimeout" specifies the poll timeout if multiple polls are detected from the
050: * same browser (default 0 - disable browser detection).
051: *
052: * @author gregw
053: * @see {@link Bayeux}
054: * @see {@link ChannelPattern}
055: */
056: public class CometdServlet extends HttpServlet {
057: public static final String ORG_MORTBAY_BAYEUX = "org.mortbay.bayeux";
058: public static final String CLIENT_ATTR = "org.mortbay.cometd.client";
059: public static final String MESSAGE_PARAM = "message";
060: public static final String TUNNEL_INIT_PARAM = "tunnelInit";
061: public static final String BROWSER_ID = "bayeuxBID";
062:
063: private Bayeux _bayeux;
064: private long _timeout = 45000;
065: private long _multiTimeout = 0;
066: private Object _multiAdvice = null;
067: private Map _bidCount = new HashMap();
068: private boolean _verbose;
069:
070: public void init() throws ServletException {
071: synchronized (CometdServlet.class) {
072: _bayeux = (Bayeux) getServletContext().getAttribute(
073: ORG_MORTBAY_BAYEUX);
074: if (_bayeux == null) {
075: _bayeux = new Bayeux(getServletContext());
076: getServletContext().setAttribute(ORG_MORTBAY_BAYEUX,
077: _bayeux);
078: }
079: }
080:
081: String filters = getInitParameter("filters");
082: if (filters != null) {
083: try {
084: Object[] objects = (Object[]) JSON
085: .parse(getServletContext().getResourceAsStream(
086: filters));
087: for (int i = 0; objects != null && i < objects.length; i++) {
088: Map filter_def = (Map) objects[i];
089:
090: Class c = Thread.currentThread()
091: .getContextClassLoader().loadClass(
092: (String) filter_def.get("class"));
093: DataFilter filter = (DataFilter) c.newInstance();
094: filter.init(filter_def.get("init"));
095: _bayeux.addFilter((String) filter_def
096: .get("channels"), filter);
097: }
098: } catch (Exception e) {
099: e.printStackTrace();
100: getServletContext().log("Could not parse: " + filters,
101: e);
102: throw new ServletException(e);
103: }
104: }
105:
106: String timeout = getInitParameter("timeout");
107: if (timeout != null)
108: _timeout = Long.parseLong(timeout);
109:
110: String multiTimeout = getInitParameter("multi-timeout");
111: if (multiTimeout != null) {
112: _multiTimeout = Long.parseLong(multiTimeout);
113: _multiAdvice = new JSON.Literal(
114: "{\"status\":\"multipleconnections\",\"reconnect\":\"retry\",\"interval\":"
115: + _multiTimeout
116: + ",\"transport\":{\"long-polling\":{}}}");
117: }
118:
119: String verbose = getInitParameter("verbose");
120: if (verbose != null)
121: _verbose = Boolean.parseBoolean(verbose);
122:
123: }
124:
125: protected void service(HttpServletRequest req,
126: HttpServletResponse resp) throws ServletException,
127: IOException {
128: String init = req.getParameter(TUNNEL_INIT_PARAM);
129: if ("iframe".equals(init)) {
130: if (_verbose)
131: System.err
132: .println("--> Init Tunnel - IFRAME CURRENTLY BROKEN!!!!!!!");
133: Transport transport = new IFrameTransport();
134: ((IFrameTransport) transport).initTunnel(resp);
135: if (_verbose)
136: System.err.println("<-- Tunnel Over");
137: } else {
138: super .service(req, resp);
139: }
140: }
141:
142: protected void doPost(HttpServletRequest req,
143: HttpServletResponse resp) throws ServletException,
144: IOException {
145: // Look for an existing client and protect from context restarts
146: Object clientObj = req.getAttribute(CLIENT_ATTR);
147: Client client = (clientObj instanceof Client) ? (Client) clientObj
148: : null;
149: Transport transport = null;
150: Continuation continuation = null;
151: String bid = null;
152:
153: // Have we seen this request before?
154: if (client != null) {
155: // yes - extract saved properties
156: transport = (Transport) req
157: .getAttribute(Bayeux.TRANSPORT_ATTR);
158: transport.setResponse(resp);
159: bid = (String) req.getAttribute(BROWSER_ID);
160:
161: // Reduce browser ID counter
162: // TODO protect from exceptions
163: if (_multiTimeout > 0 && decBID(bid) == 0) {
164: /*
165: if (false) // TODO only reset if advised previously
166: _bayeux.advise(client,transport,null);
167: */
168: }
169: } else {
170: // No - process messages
171:
172: // Look for a browser ID
173: if (_multiTimeout > 0) {
174: Cookie[] cookies = req.getCookies();
175: for (int i = 0; cookies != null && i < cookies.length; i++) {
176: if (cookies[i].getName().equals(BROWSER_ID))
177: bid = cookies[i].getValue();
178: }
179: if (bid == null) {
180: long l1 = _bayeux._random.nextLong();
181: long l2 = _bayeux._random.nextLong();
182: bid = Long.toString(l1 < 0 ? -l1 : l1, 16)
183: + Long.toString(l2 < 0 ? -l2 : l2, 16);
184: Cookie cookie = new Cookie(BROWSER_ID, bid);
185: cookie.setPath("/");
186: resp.addCookie(cookie);
187: }
188: }
189:
190: // variables to loop through batches and messages
191: // May have multiple message parameters, each with an array of messages - or just a message
192: String[] batches = null;
193: int batch_index = 0;
194: Object batch = null;
195: int index = 0;
196: Map message = null;
197: int message_count = 0;
198:
199: batches = req.getParameterValues(MESSAGE_PARAM);
200:
201: // Loop to the first message - it may be handshake without a client
202: while (batch_index < batches.length) {
203: // Do we need to get another batch?
204: if (batch == null) {
205: if (_verbose)
206: System.err.println("=" + batch_index + "=>"
207: + batches[batch_index]);
208: index = 0;
209: batch = JSON.parse(batches[batch_index++]);
210: }
211:
212: if (batch == null)
213: continue;
214:
215: if (batch.getClass().isArray()) {
216: message = (Map) Array.get(batch, index++);
217: if (index >= Array.getLength(batch))
218: batch = null;
219: } else {
220: message = (Map) batch;
221: batch = null;
222: }
223:
224: message_count++;
225:
226: client = _bayeux.getClient((String) message
227: .get(Bayeux.CLIENT_ATTR));
228:
229: // If no client, this is a handshake
230: if (client == null) {
231: // handshake!
232: transport = _bayeux.newTransport(client, message);
233: transport.setResponse(resp);
234: _bayeux.handle(null, transport, message);
235: message = null;
236: }
237:
238: break;
239: }
240:
241: // Handle all client messages
242: if (client != null) {
243: // resolve transport
244: transport = _bayeux.newTransport(client, message);
245: transport.setResponse(resp);
246: if (_verbose
247: && transport instanceof PlainTextJSONTransport)
248: ((PlainTextJSONTransport) transport)
249: .setVerbose(_verbose);
250:
251: // continue handling messages with a known client and transport!
252: try {
253: // Tell client to hold messages as a response is likely to be sent.
254: // TODO client.responsePending();
255:
256: // handle any message left over from client loop above
257: if (message != null)
258: _bayeux.handle(client, transport, message);
259: message = null;
260:
261: // handle all other messages
262: while (batch_index < batches.length) {
263: // Do we need to get another batch?
264: if (batch == null) {
265: if (_verbose)
266: System.err.println("=" + batch_index
267: + "=>" + batches[batch_index]);
268: index = 0;
269: batch = JSON.parse(batches[batch_index++]);
270: }
271: if (batch == null)
272: continue;
273:
274: // get the next message
275: if (batch.getClass().isArray()) {
276: message = (Map) Array.get(batch, index++);
277: if (index >= Array.getLength(batch))
278: batch = null;
279: } else {
280: message = (Map) batch;
281: batch = null;
282: }
283:
284: // handle message
285: if (message != null)
286: _bayeux.handle(client, transport, message);
287: message = null;
288: }
289:
290: } finally {
291: // TODO client.responded();
292: }
293: }
294: }
295:
296: // Do we need to wait for messages or are we streaming?
297: while (transport.isPolling()) {
298: long timeout = _timeout;
299: continuation = ContinuationSupport.getContinuation(req,
300: client);
301:
302: // Get messages or wait
303: List messages = null;
304: synchronized (client) {
305: messages = client.takeMessages();
306:
307: if (messages == null && !continuation.isPending()) {
308: //check that only 1 request per browser is waiting
309: if (_multiTimeout > 0 && incBID(bid) > 1) {
310: // Advise that there are multiple windows waiting
311: // fall back to traditional polling
312: timeout = _multiTimeout;
313: _bayeux.advise(client, transport, _multiAdvice);
314: }
315:
316: // save state and suspend
317: client.addContinuation(continuation);
318: req.setAttribute(CLIENT_ATTR, client);
319: req.setAttribute(BROWSER_ID, bid);
320: req.setAttribute(Bayeux.TRANSPORT_ATTR, transport);
321: continuation.suspend(timeout);
322: client.removeContinuation(continuation);
323:
324: messages = client.takeMessages();
325: }
326: continuation.reset();
327: client.removeContinuation(continuation);
328:
329: if (messages == null) // timeout
330: transport.setPolling(false);
331:
332: }
333:
334: // Send the messages
335: if (messages != null) {
336: transport.send(messages);
337: }
338:
339: // Only a simple poll if the transport does not flush
340: if (!transport.keepAlive())
341: transport.setPolling(false);
342:
343: }
344:
345: // Send any left over messages.
346: /* TODO
347: if (client!=null)
348: {
349: List messages = client.takeMessages();
350: if (messages!=null)
351: transport.send(messages);
352: }
353: */
354: transport.complete();
355: }
356:
357: private int incBID(String bid) {
358: synchronized (_bidCount) {
359: Integer count = (Integer) _bidCount.get(bid);
360: count = TypeUtil.newInteger(count == null ? 1 : count
361: .intValue() + 1);
362: _bidCount.put(bid, count);
363: return count.intValue();
364: }
365: }
366:
367: private int decBID(String bid) {
368: synchronized (_bidCount) {
369: Integer count = (Integer) _bidCount.get(bid);
370: count = (count == null || count.intValue() <= 1) ? null
371: : TypeUtil.newInteger(count.intValue() - 1);
372: if (count == null) {
373: _bidCount.remove(bid);
374: return 0;
375: }
376: _bidCount.put(bid, count);
377: return count.intValue();
378: }
379: }
380:
381: protected void doGet(HttpServletRequest req,
382: HttpServletResponse resp) throws ServletException,
383: IOException {
384: doPost(req, resp);
385: }
386: }
|