001: // ========================================================================
002: // Copyright 2006 Mort Bay Consulting Pty. Ltd.
003: // ------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: //========================================================================
014:
015: package org.mortbay.cometd;
016:
017: import java.io.IOException;
018: import java.security.SecureRandom;
019: import java.util.ArrayList;
020: import java.util.HashMap;
021: import java.util.Iterator;
022: import java.util.Map;
023: import java.util.Random;
024: import java.util.Set;
025:
026: import javax.servlet.ServletContext;
027:
028: import org.mortbay.util.DateCache;
029:
030: /* ------------------------------------------------------------ */
031: /**
032: * @author gregw
033: *
034: */
035: public class Bayeux {
036: public static final String META_CONNECT = "/meta/connect";
037: public static final String META_DISCONNECT = "/meta/disconnect";
038: public static final String META_HANDSHAKE = "/meta/handshake";
039: public static final String META_PING = "/meta/ping";
040: public static final String META_RECONNECT = "/meta/reconnect";
041: public static final String META_STATUS = "/meta/status";
042: public static final String META_SUBSCRIBE = "/meta/subscribe";
043: public static final String META_UNSUBSCRIBE = "/meta/unsubscribe";
044:
045: public static final String MONITOR_CHANNEL_EVENT = "/meta/monitor/channel/event";
046: public static final String MONITOR_CLIENT_EVENT = "/meta/monitor/client/event";
047:
048: public static final String CLIENT_ATTR = "clientId";
049: public static final String DATA_ATTR = "data";
050: public static final String CHANNEL_ATTR = "channel";
051: public static final String TIMESTAMP_ATTR = "timestamp";
052: public static final String TRANSPORT_ATTR = "transport";
053: public static final String ADVICE_ATTR = "advice";
054:
055: private static final JSON.Literal __NO_ADVICE = new JSON.Literal(
056: "{}");
057: HashMap _channels = new HashMap();
058: HashMap _clients = new HashMap();
059: ServletContext _context;
060: DateCache _dateCache = new DateCache();
061: Random _random;
062: HashMap _handlers = new HashMap();
063: HashMap _transports = new HashMap();
064: HashMap _filters = new java.util.HashMap();
065: ArrayList _filterOrder = new ArrayList();
066: SecurityPolicy _securityPolicy = new DefaultPolicy();
067: Object _advice = new JSON.Literal(
068: "{\"reconnect\":\"retry\",\"interval\":0}");
069: Object _unknownAdvice = new JSON.Literal(
070: "{\"reconnect\":\"handshake\",\"interval\":500}");
071:
072: {
073: _handlers.put("*", new PublishHandler());
074: _handlers.put(META_HANDSHAKE, new HandshakeHandler());
075: _handlers.put(META_CONNECT, new ConnectHandler());
076: _handlers.put(META_RECONNECT, new ReconnectHandler());
077: _handlers.put(META_DISCONNECT, new DisconnectHandler());
078: _handlers.put(META_SUBSCRIBE, new SubscribeHandler());
079: _handlers.put(META_UNSUBSCRIBE, new UnsubscribeHandler());
080: _handlers.put(META_STATUS, new StatusHandler());
081: _handlers.put(META_PING, new PingHandler());
082:
083: _transports.put("iframe", IFrameTransport.class);
084: _transports.put("long-polling", PlainTextJSONTransport.class);
085:
086: newChannel(MONITOR_CHANNEL_EVENT);
087: newChannel(MONITOR_CLIENT_EVENT);
088: }
089:
090: Bayeux(ServletContext context) {
091: _context = context;
092: try {
093: _random = SecureRandom.getInstance("SHA1PRNG");
094: } catch (Exception e) {
095: context.log(
096: "Could not get secure random for ID generation", e);
097: _random = new Random();
098: }
099: _random.setSeed(_random.nextLong() ^ hashCode()
100: ^ (context.hashCode() << 32)
101: ^ Runtime.getRuntime().freeMemory());
102: }
103:
104: /* ------------------------------------------------------------ */
105: /**
106: * @param id
107: * @return
108: */
109: public Channel getChannel(String id) {
110: return (Channel) _channels.get(id);
111: }
112:
113: /* ------------------------------------------------------------ */
114: /**
115: * @param channels A {@link ChannelPattern}
116: * @param filter The filter instance to apply to new channels matching the pattern
117: */
118: public void addFilter(String channels, DataFilter filter) {
119: synchronized (_filters) {
120: ChannelPattern pattern = new ChannelPattern(channels);
121: _filters.put(pattern, filter);
122: _filterOrder.remove(pattern);
123: _filterOrder.add(pattern);
124: }
125: }
126:
127: /* ------------------------------------------------------------ */
128: /**
129: * @param id
130: * @return
131: */
132: public Channel newChannel(String id) {
133: Channel channel = (Channel) _channels.get(id);
134: if (channel == null) {
135: channel = new Channel(id, this );
136:
137: Iterator p = _filterOrder.iterator();
138: while (p.hasNext()) {
139: ChannelPattern pattern = (ChannelPattern) p.next();
140: if (pattern.matches(id))
141: channel.addDataFilter((DataFilter) _filters
142: .get(pattern));
143: }
144:
145: _channels.put(id, channel);
146: }
147: return channel;
148: }
149:
150: /* ------------------------------------------------------------ */
151: /**
152: * @return
153: */
154: public Set getChannelIDs() {
155: return _channels.keySet();
156: }
157:
158: /* ------------------------------------------------------------ */
159: /**
160: * @param client_id
161: * @return
162: */
163: public synchronized Client getClient(String client_id) {
164: return (Client) _clients.get(client_id);
165: }
166:
167: /* ------------------------------------------------------------ */
168: /**
169: * @return
170: */
171: public synchronized Client newClient() {
172: return newClient(null);
173: }
174:
175: /* ------------------------------------------------------------ */
176: /**
177: * @param idPrefix Prefix to random client ID
178: * @return
179: */
180: public synchronized Client newClient(String idPrefix) {
181: Client client = new Client(this , idPrefix);
182: _clients.put(client.getId(), client);
183: return client;
184: }
185:
186: /* ------------------------------------------------------------ */
187: /**
188: * @return
189: */
190: public Set getClientIDs() {
191: return _clients.keySet();
192: }
193:
194: /* ------------------------------------------------------------ */
195: /**
196: * @return
197: */
198: String getTimeOnServer() {
199: return _dateCache.format(System.currentTimeMillis());
200: }
201:
202: /* ------------------------------------------------------------ */
203: /**
204: * @param client
205: * @param message
206: * @return
207: */
208: Transport newTransport(Client client, Map message) {
209: try {
210: String type = client == null ? null : client
211: .getConnectionType();
212: if (type == null)
213: type = (String) message.get("connectionType");
214:
215: if (type != null) {
216: Class trans_class = (Class) _transports.get(type);
217: if (trans_class != null)
218: return (Transport) (trans_class.newInstance());
219: }
220: return new PlainTextJSONTransport();
221: } catch (Exception e) {
222: throw new RuntimeException(e);
223: }
224: }
225:
226: /* ------------------------------------------------------------ */
227: /**
228: * @param client
229: * @param transport
230: * @param message
231: * @return
232: */
233: void handle(Client client, Transport transport, Map message)
234: throws IOException {
235: String channel_id = (String) message.get(CHANNEL_ATTR);
236:
237: Handler handler = (Handler) _handlers.get(channel_id);
238: if (handler == null)
239: handler = (Handler) _handlers.get("*");
240:
241: handler.handle(client, transport, message);
242: }
243:
244: /* ------------------------------------------------------------ */
245: void advise(Client client, Transport transport, Object advice)
246: throws IOException {
247: if (advice == null)
248: advice = _advice;
249: if (advice == null)
250: advice = __NO_ADVICE;
251: String connection_id = "/meta/connections/" + client.getId();
252: Map reply = new HashMap();
253: reply.put(CHANNEL_ATTR, connection_id);
254: reply.put("connectionId", connection_id);
255: reply.put("timestamp", _dateCache.format(System
256: .currentTimeMillis()));
257: reply.put("successful", Boolean.TRUE);
258: reply.put(ADVICE_ATTR, advice);
259: transport.send(reply);
260: }
261:
262: /* ------------------------------------------------------------ */
263: long getRandom(long variation) {
264: long l = _random.nextLong() ^ variation;
265: return l < 0 ? -l : l;
266: }
267:
268: /* ------------------------------------------------------------ */
269: public SecurityPolicy getSecurityPolicy() {
270: return _securityPolicy;
271: }
272:
273: /* ------------------------------------------------------------ */
274: public void setSecurityPolicy(SecurityPolicy securityPolicy) {
275: _securityPolicy = securityPolicy;
276: }
277:
278: /* ------------------------------------------------------------ */
279: /* ------------------------------------------------------------ */
280: private interface Handler {
281: void handle(Client client, Transport transport, Map message)
282: throws IOException;
283: }
284:
285: /* ------------------------------------------------------------ */
286: /* ------------------------------------------------------------ */
287: private class ConnectHandler implements Handler {
288: public void handle(Client client, Transport transport,
289: Map message) throws IOException {
290: Map reply = new HashMap();
291: reply.put(CHANNEL_ATTR, META_CONNECT);
292:
293: if (client == null)
294: throw new IllegalStateException("No client");
295: String type = (String) message.get("connectionType");
296: client.setConnectionType(type);
297:
298: Channel connection = client.connect();
299: if (connection != null) {
300: reply.put("successful", Boolean.TRUE);
301: reply.put("error", "");
302: reply.put("connectionId", connection.getId());
303: } else {
304: reply.put("successful", Boolean.FALSE);
305: reply.put("error", "unknown client ID");
306: if (_unknownAdvice != null)
307: reply.put(ADVICE_ATTR, _unknownAdvice);
308: }
309: reply.put("timestamp", _dateCache.format(System
310: .currentTimeMillis()));
311: transport.send(reply);
312: transport.setPolling(false);
313:
314: // send event to monitoring channel
315: getChannel(MONITOR_CLIENT_EVENT).publish(reply, client);
316: }
317: }
318:
319: /* ------------------------------------------------------------ */
320: /* ------------------------------------------------------------ */
321: private class PublishHandler implements Handler {
322: public void handle(Client client, Transport transport,
323: Map message) throws IOException {
324: String channel_id = (String) message.get("channel");
325:
326: Channel channel = getChannel(channel_id);
327:
328: Object data = message.get("data");
329:
330: if (client == null) {
331: if (_securityPolicy.authenticate((String) message
332: .get("authScheme"), (String) message
333: .get("authUser"), (String) message
334: .get("authToken")))
335: client = newClient(null);
336: }
337:
338: Map reply = new HashMap();
339: reply.put(CHANNEL_ATTR, channel_id);
340: if (channel != null
341: && data != null
342: && _securityPolicy
343: .canSend(client, channel, message)) {
344: channel.publish(data, client);
345: reply.put("successful", Boolean.TRUE);
346: reply.put("error", "");
347: } else {
348: reply.put("successful", Boolean.FALSE);
349: reply.put("error", "unknown channel");
350: }
351: transport.send(reply);
352: }
353: }
354:
355: /* ------------------------------------------------------------ */
356: /* ------------------------------------------------------------ */
357: private class DisconnectHandler implements Handler {
358: public void handle(Client client, Transport transport,
359: Map message) {
360: }
361: }
362:
363: /* ------------------------------------------------------------ */
364: /* ------------------------------------------------------------ */
365: private class HandshakeHandler implements Handler {
366: public void handle(Client client, Transport transport,
367: Map message) throws IOException {
368: if (client != null)
369: throw new IllegalStateException();
370:
371: if (_securityPolicy
372: .authenticate((String) message.get("authScheme"),
373: (String) message.get("authUser"),
374: (String) message.get("authToken")))
375: client = newClient(null);
376:
377: Map reply = new HashMap();
378: reply.put(CHANNEL_ATTR, META_HANDSHAKE);
379: reply.put("version", new Double(0.1));
380: reply.put("minimumVersion", new Double(0.1));
381:
382: if (client != null) {
383: reply.put("supportedConnectionTypes", new String[] {
384: "long-polling", "iframe" });
385: reply.put("authSuccessful", Boolean.TRUE);
386: reply.put(CLIENT_ATTR, client.getId());
387: if (_advice != null)
388: reply.put(ADVICE_ATTR, _advice);
389: } else {
390: reply.put("authSuccessful", Boolean.FALSE);
391: if (_advice != null)
392: reply.put(ADVICE_ATTR, _advice);
393: }
394:
395: transport.send(reply);
396: }
397: }
398:
399: /* ------------------------------------------------------------ */
400: /* ------------------------------------------------------------ */
401: private class PingHandler implements Handler {
402: public void handle(Client client, Transport transport,
403: Map message) throws IOException {
404: }
405: }
406:
407: /* ------------------------------------------------------------ */
408: /* ------------------------------------------------------------ */
409: private class ReconnectHandler implements Handler {
410: public void handle(Client client, Transport transport,
411: Map message) throws IOException {
412: // TODO check other parameters.
413:
414: String connection_id = "/meta/connections/"
415: + message.get(CLIENT_ATTR);
416: Map reply = new HashMap();
417: reply.put(CHANNEL_ATTR, META_RECONNECT);
418: reply.put("connectionId", connection_id);
419: reply.put("timestamp", _dateCache.format(System
420: .currentTimeMillis()));
421:
422: if (client == null) {
423: reply.put("successful", Boolean.FALSE);
424: reply.put("error", "unknown clientID");
425: if (_unknownAdvice != null)
426: reply.put(ADVICE_ATTR, _unknownAdvice);
427: transport.setPolling(false);
428: transport.send(reply);
429: } else {
430: String type = (String) message.get("connectionType");
431: if (type != null)
432: client.setConnectionType(type);
433: reply.put("successful", Boolean.TRUE);
434: reply.put("error", "");
435: transport.setPolling(true);
436: transport.send(reply);
437: }
438:
439: // send event to monitoring channel
440: getChannel(MONITOR_CLIENT_EVENT).publish(reply, client);
441: }
442: }
443:
444: /* ------------------------------------------------------------ */
445: /* ------------------------------------------------------------ */
446: private class StatusHandler implements Handler {
447: public void handle(Client client, Transport transport,
448: Map message) throws IOException {
449: }
450: }
451:
452: /* ------------------------------------------------------------ */
453: /* ------------------------------------------------------------ */
454: private class SubscribeHandler implements Handler {
455: public void handle(Client client, Transport transport,
456: Map message) throws IOException {
457: if (client == null)
458: throw new IllegalStateException("No client");
459:
460: String channel_id = (String) message.get("subscription");
461:
462: // select a random channel ID if none specifified
463: if (channel_id == null) {
464: channel_id = Long.toString(getRandom(message.hashCode()
465: ^ client.hashCode()), 36);
466: while (getChannel(channel_id) != null)
467: channel_id = Long.toString(getRandom(message
468: .hashCode()
469: ^ client.hashCode()), 36);
470: }
471:
472: // get the channel (or create if permitted)
473: Channel channel = getChannel(channel_id);
474: if (channel == null
475: && _securityPolicy.canCreate(client, channel,
476: message))
477: channel = newChannel(channel_id);
478:
479: Map reply = new HashMap();
480: reply.put(CHANNEL_ATTR, channel_id);
481: reply.put("subscription", channel.getId());
482:
483: if (channel != null
484: && _securityPolicy.canSubscribe(client, channel,
485: message)) {
486: channel.addSubscriber(client);
487: reply.put("successful", Boolean.TRUE);
488: reply.put("error", "");
489: } else {
490: reply.put("successful", Boolean.FALSE);
491: reply.put("error", "cannot subscribe");
492: }
493: transport.send(reply);
494:
495: //this is ugly. but the transport does not operate in a seperate thread so it should not
496: //do any harm. maybe it is better to not pass the reply to the monitoring
497: //channels but instead a custom object
498: reply.put(CHANNEL_ATTR, message.get(CHANNEL_ATTR));
499: // send event to monitoring channel
500: getChannel(MONITOR_CHANNEL_EVENT).publish(reply, client);
501: }
502: }
503:
504: /* ------------------------------------------------------------ */
505: /* ------------------------------------------------------------ */
506: private class UnsubscribeHandler implements Handler {
507: public void handle(Client client, Transport transport,
508: Map message) throws IOException {
509: if (client == null)
510: return;
511:
512: String channel_id = (String) message.get("subscription");
513: Channel channel = getChannel(channel_id);
514: if (channel != null)
515: channel.removeSubscriber(client);
516:
517: Map reply = new HashMap();
518: reply.put(CHANNEL_ATTR, channel_id);
519: reply.put("subscription", channel.getId());
520: reply.put("successful", Boolean.TRUE);
521: reply.put("error", "");
522: transport.send(reply);
523:
524: //this is ugly. but the transport does not operate in a seperate thread so it should not
525: //do any harm. maybe it is better to not pass the reply to the monitoring
526: //channels but instead a custom object
527: reply.put(CHANNEL_ATTR, message.get(CHANNEL_ATTR));
528: // send event to monitoring channel
529: getChannel(MONITOR_CHANNEL_EVENT).publish(reply, client);
530: }
531: }
532:
533: /* ------------------------------------------------------------ */
534: /* ------------------------------------------------------------ */
535: private static class DefaultPolicy implements SecurityPolicy {
536:
537: public boolean canCreate(Client client, Channel channel,
538: Map message) {
539: return client != null;
540: // TODO return !channel.getId().startsWith("/meta/");
541: }
542:
543: public boolean canSubscribe(Client client, Channel channel,
544: Map message) {
545: return client != null;
546: // TODO return !channel.getId().startsWith("/meta/");
547: }
548:
549: public boolean canSend(Client client, Channel channel,
550: Map message) {
551: return client != null;
552: //TODO return !channel.getId().startsWith("/meta/");
553: }
554:
555: public boolean authenticate(String scheme, String user,
556: String credentials) {
557: // TODO Auto-generated method stub
558: return true;
559: }
560:
561: }
562: }
|