001: package org.obe.event;
002:
003: import org.obe.client.api.model.MIMETypes;
004: import org.obe.client.api.repository.EventParameter;
005: import org.obe.client.api.repository.EventTypeMetaData;
006: import org.obe.client.api.repository.RepositoryException;
007: import org.obe.engine.EngineContext;
008: import org.obe.engine.WorkflowEngineUtilities;
009: import org.obe.engine.repository.AbstractRepository;
010: import org.obe.engine.util.Comparators;
011: import org.obe.spi.WorkflowContext;
012: import org.obe.spi.evaluator.Evaluator;
013: import org.obe.spi.evaluator.EvaluatorException;
014: import org.obe.spi.event.ApplicationEvent;
015: import org.obe.spi.event.ApplicationEventListener;
016: import org.obe.spi.service.ApplicationEventBroker;
017: import org.obe.spi.service.ServerConfig;
018: import org.obe.spi.service.ServiceManager;
019: import org.obe.spi.util.AsyncFireApplicationEvent;
020: import org.obe.util.CommonConfig;
021: import org.obe.xpdl.model.condition.Condition;
022: import org.obe.xpdl.model.misc.Duration;
023:
024: import java.io.IOException;
025: import java.io.InputStream;
026: import java.io.Serializable;
027: import java.util.*;
028:
029: /*
030: Problems still to solve:
031: 1. how to prevent two threads or VMs from processing the same subscriptions.
032: ==> rely on database locking ==> SELECT FOR UPDATE, etc.
033: 2. how to ensure that an inbound event is not lost if a matching subscription
034: has not yet been created.
035: ==> persist the event with its key hash.
036: */
037:
038: /**
039: * Abstract event broker that stores its event definitions in XML. Subclasses
040: * provide matching services event and subscription storage.
041: *
042: * @author Adrian Price
043: */
044: public abstract class AbstractApplicationEventBroker extends
045: AbstractRepository implements ApplicationEventBroker {
046:
047: private static final boolean SUPPORTS_KEY_BASED_SUBSCRIPTIONS = ServerConfig
048: .supportsKeyBasedEventSubscriptions();
049:
050: protected boolean _predicateWarningIssued;
051: private final List _listeners = new ArrayList();
052: private final Map _mimeTypeForClass = new HashMap();
053: private final Map _mimeClasses = new HashMap();
054: private final Map _contentHandlers = new HashMap();
055:
056: protected AbstractApplicationEventBroker(ServiceManager svcMgr) {
057: super (svcMgr, EventTypeMetaData.class);
058: }
059:
060: public String getServiceName() {
061: return SERVICE_NAME;
062: }
063:
064: public synchronized void init() throws IOException,
065: RepositoryException {
066: super .init();
067:
068: // Load the MIME type mappings.
069: Properties mimeTypes = new Properties();
070: InputStream in = CommonConfig
071: .openInputStream("Class2MimeType.properties");
072: if (in != null) {
073: try {
074: mimeTypes.load(in);
075: } finally {
076: in.close();
077: }
078: for (Iterator iter = mimeTypes.entrySet().iterator(); iter
079: .hasNext();) {
080: Map.Entry entry = (Map.Entry) iter.next();
081: String className = (String) entry.getKey();
082: String mimeType = (String) entry.getValue();
083: try {
084: Class clazz = Class.forName(className);
085: _mimeTypeForClass.put(clazz, entry.getValue());
086: } catch (ClassNotFoundException e) {
087: getLogger().error(
088: "Unable to load class '" + className
089: + "' for MIME type '" + mimeType
090: + '\'');
091: }
092: }
093: }
094:
095: // Load the MIME content handlers.
096: in = CommonConfig.openInputStream("MimeHandlers.properties");
097: if (in != null) {
098: try {
099: Properties handlers = new Properties();
100: Properties hints = new Properties();
101: handlers.load(in);
102: Set entries = handlers.entrySet();
103: for (Iterator iter = entries.iterator(); iter.hasNext();) {
104: Map.Entry handlerEntry = (Map.Entry) iter.next();
105: String key = (String) handlerEntry.getKey();
106: if (key.startsWith("handler.")) {
107: try {
108: // Instantiate the content handler.
109: Class clazz = Class
110: .forName((String) handlerEntry
111: .getValue());
112: ContentHandler handler = (ContentHandler) clazz
113: .newInstance();
114:
115: // Extract hints and initialize the handler.
116: String contentType = key.substring(8);
117: String hintPrefix = "hint." + contentType;
118: hints.clear();
119: for (Iterator iter2 = entries.iterator(); iter2
120: .hasNext();) {
121:
122: Map.Entry hintEntry = (Map.Entry) iter2
123: .next();
124:
125: String v = (String) hintEntry.getKey();
126: if (v.startsWith(hintPrefix)) {
127: hints.setProperty(v.substring(5),
128: (String) hintEntry
129: .getValue());
130: }
131: }
132: handler.init(getServiceManager(), hints);
133:
134: // If all went well, register the handler.
135: _contentHandlers.put(contentType, handler);
136: } catch (ClassNotFoundException e) {
137: getLogger().error(e);
138: } catch (IllegalAccessException e) {
139: getLogger().error(e);
140: } catch (InstantiationException e) {
141: getLogger().error(e);
142: } catch (ClassCastException e) {
143: getLogger().error(e);
144: }
145: }
146: }
147: } finally {
148: in.close();
149: }
150: }
151: }
152:
153: public boolean supportsKeyBasedSubscriptions() {
154: return SUPPORTS_KEY_BASED_SUBSCRIPTIONS;
155: }
156:
157: public void createEventType(EventTypeMetaData eventType)
158: throws RepositoryException {
159:
160: createEntry(eventType);
161: }
162:
163: public void deleteEventType(String eventType)
164: throws RepositoryException {
165: deleteEntry(eventType);
166: }
167:
168: public void updateEventType(EventTypeMetaData eventType)
169: throws RepositoryException {
170:
171: updateEntry(eventType.getId(), eventType);
172: }
173:
174: public EventTypeMetaData[] findEventTypeMetaData()
175: throws RepositoryException {
176:
177: return (EventTypeMetaData[]) findMetaData();
178: }
179:
180: public EventTypeMetaData findEventTypeMetaData(String eventId)
181: throws RepositoryException {
182:
183: return (EventTypeMetaData) findMetaData(eventId, true);
184: }
185:
186: public final void addApplicationEventListener(
187: ApplicationEventListener listener) {
188:
189: if (_listeners.contains(listener)) {
190: throw new IllegalArgumentException(
191: "Event listener is already registered: " + listener);
192: }
193: if (getLogger().isDebugEnabled())
194: getLogger().debug(
195: "addApplicationEventListener(" + listener + ')');
196:
197: _listeners.add(listener);
198: }
199:
200: public final void removeApplicationEventListener(
201: ApplicationEventListener listener) {
202:
203: if (!_listeners.remove(listener)) {
204: throw new IllegalArgumentException(
205: "Event listener is not registered: " + listener);
206: }
207:
208: if (getLogger().isDebugEnabled())
209: getLogger().debug(
210: "removeApplicationEventListener(" + listener + ')');
211: }
212:
213: protected String getContentType(Object data, Map attrs) {
214: // Check whether content type was specified explicitly.
215: String contentType = (String) attrs
216: .get(ApplicationEvent.CONTENT_TYPE);
217: if (contentType == null) {
218: // If not explicit, determine content type by introspection.
219: // First check to see whether we've already introspected this class.
220: Class clazz = data == null ? Object.class : data.getClass();
221: String className = clazz.getName();
222: synchronized (_mimeClasses) {
223: contentType = (String) _mimeClasses.get(className);
224: }
225:
226: // If we haven't, select the first compatible MIME type by class.
227: if (contentType == null) {
228: Iterator iter = _mimeTypeForClass.entrySet().iterator();
229: while (iter.hasNext()) {
230: Map.Entry entry = (Map.Entry) iter.next();
231: Class mimeClass = (Class) entry.getKey();
232: if (mimeClass.isAssignableFrom(clazz)) {
233: contentType = (String) entry.getValue();
234: break;
235: }
236: }
237: if (contentType == null)
238: contentType = MIMETypes.JAVA_OBJECT;
239: synchronized (_mimeClasses) {
240: _mimeClasses.put(className, contentType);
241: }
242: }
243: }
244:
245: return contentType;
246: }
247:
248: protected ContentHandler getContentHandler(String contentType) {
249: Object handler = _contentHandlers.get(contentType);
250: if (handler == null) {
251: // If contentType includes the subtype, get the base type handler.
252: int n = contentType.indexOf('/');
253: if (n != -1)
254: handler = _contentHandlers.get(contentType.substring(0,
255: n));
256: // If no handler is defined, use the default Java object handler.
257: if (handler == null)
258: handler = _contentHandlers.get(MIMETypes.JAVA_OBJECT);
259: }
260: return (ContentHandler) handler;
261: }
262:
263: protected Evaluator getEvaluatorFor(EventTypeMetaData metaData)
264: throws RepositoryException {
265:
266: Evaluator evaluator;
267: String scriptType = metaData.getScriptType();
268: if (scriptType == null)
269: scriptType = ServerConfig.getScriptType();
270: evaluator = _svcMgr.getEvaluatorFactory().findEvaluator(
271: scriptType);
272: return evaluator;
273: }
274:
275: public void publish(Object source, Map attrs)
276: throws RepositoryException, EvaluatorException {
277:
278: if (source == null)
279: throw new IllegalArgumentException("data cannot be null");
280: if (attrs == null)
281: attrs = Collections.EMPTY_MAP;
282:
283: if (getLogger().isDebugEnabled())
284: getLogger().debug("publish(" + source + ", " + attrs + ')');
285:
286: // Determine the content type of the raw event data.
287: String contentType = getContentType(source, attrs);
288:
289: // Get the content handler for this type.
290: ContentHandler handler = getContentHandler(contentType);
291:
292: // If the content type doesn't include a subtype, ask the handler.
293: int n = contentType.indexOf('/');
294: if (n == -1)
295: contentType = handler.getContentType(source);
296:
297: // Call the appropriate content handler to pre-process it and wrap the
298: // raw event data into one or more application events.
299: ApplicationEvent[] events = handler.process(source, attrs,
300: contentType);
301:
302: // Conditional and event key expressions must be evaluated against an
303: // EngineContext.
304: EngineContext ctx = EngineContext.pushContext(
305: getServiceManager().getEngine(), null, null);
306: try {
307: for (int i = 0; i < events.length; i++) {
308: ApplicationEvent event = events[i];
309: ctx.setEvent(event);
310:
311: contentType = event.getContentType();
312: String schema = event.getSchema();
313: String action = event.getAction();
314:
315: // Classify into a business event type.
316: String eventType = null;
317: EventTypeMetaData metaData = null;
318: Evaluator evaluator = null;
319:
320: // Use content type and schema to look up the event type.
321: EventTypeMetaData[] eventTypes = findEventTypeMetaData();
322: for (int j = 0; j < eventTypes.length; j++) {
323: metaData = eventTypes[j];
324:
325: // Skip this event type record if it doesn't match on
326: // content type, schema and action.
327: if (!metaData.getContentType().equals(contentType)
328: || !metaData.getSchema().equals(schema)
329: || !Comparators.equals(
330: metaData.getAction(), action)) {
331:
332: continue;
333: }
334:
335: // If there's a qualifying condition, evaluate it.
336: Condition condition = metaData.getCondition();
337: if (condition != null) {
338: evaluator = getEvaluatorFor(metaData);
339: if (!evaluator.evaluateCondition(condition
340: .getValue(), ctx)) {
341:
342: // Condition yielded false - no match.
343: evaluator = null;
344: continue;
345: }
346: }
347:
348: eventType = metaData.getId();
349: event.setEventType(eventType);
350: break;
351: }
352:
353: // If this contentType/schema/action didn't match an event type,
354: // ignore it.
355: if (eventType == null) {
356: getLogger()
357: .info(
358: "ApplicationEvent with contentType='"
359: + contentType
360: + "', schema='"
361: + schema
362: + "' and action="
363: + action
364: + " does not match any known event type. The "
365: + "event will be discarded");
366: continue;
367: }
368:
369: if (getLogger().isDebugEnabled())
370: getLogger().debug("Found event type: " + eventType);
371:
372: // Make sure we have an evaluator for this event type.
373: if (evaluator == null)
374: evaluator = getEvaluatorFor(metaData);
375:
376: // Compute the event keys.
377: EventParameter[] parms = metaData.getFormalParameter();
378: Serializable[] eventKeys = new Serializable[parms.length];
379: for (int k = 0; k < parms.length; k++) {
380: String keyExpr = parms[k].getKeyExpression();
381: Object key = evaluator.evaluateExpression(keyExpr,
382: ctx);
383: if (!(key instanceof Serializable)) {
384: throw new EvaluatorException(
385: "Key expression '"
386: + keyExpr
387: + "' for event type '"
388: + eventType
389: + "' does not yield a serializable value");
390: }
391: eventKeys[k] = (Serializable) key;
392: }
393:
394: if (getLogger().isDebugEnabled()) {
395: getLogger().debug(
396: "Computed event keys: "
397: + Arrays.asList(eventKeys));
398: }
399: event.setKeys(eventKeys);
400:
401: // Compute the event's expiration date, if applicable.
402: if (metaData.getTimeToLive() != null) {
403: Date expiry = WorkflowEngineUtilities
404: .calculateExpiryDate(metaData
405: .getTimeToLive(), metaData
406: .getCalendar(), _svcMgr
407: .getCalendarFactory());
408: event.setExpiry(expiry);
409:
410: if (getLogger().isDebugEnabled())
411: getLogger().debug(
412: "Computed expiry date: " + expiry);
413: }
414:
415: // Forward the event to the listeners.
416: publish(event, metaData, evaluator, ctx);
417: }
418: } finally {
419: EngineContext.popContext();
420: }
421: }
422:
423: public final void publish(ApplicationEvent event)
424: throws RepositoryException, EvaluatorException {
425:
426: EngineContext ctx = EngineContext.pushContext(
427: getServiceManager().getEngine(), event);
428: try {
429: EventTypeMetaData metaData = findEventTypeMetaData(event
430: .getEventType());
431: Evaluator evaluator = getEvaluatorFor(metaData);
432: publish(event, metaData, evaluator, ctx);
433: } finally {
434: EngineContext.popContext();
435: }
436: }
437:
438: protected final void publish(ApplicationEvent event,
439: EventTypeMetaData metaData, Evaluator evaluator,
440: WorkflowContext ctx) throws RepositoryException,
441: EvaluatorException {
442:
443: if (getLogger().isDebugEnabled())
444: getLogger().debug("publish(" + event + ')');
445:
446: // Look for matching subscriptions.
447: int matches = 0;
448: Collection subscriptions = findSubscriptions(event, metaData);
449: synchronized (subscriptions) {
450: for (Iterator iter = subscriptions.iterator(); iter
451: .hasNext();) {
452: Object o = iter.next();
453:
454: // Temporal events are injected directly to the
455: // fireApplicationEvent() method.
456: if (!(o instanceof ApplicationEventSubscription))
457: continue;
458:
459: // Application events must be matched against all subscriptions
460: // for this event type.
461: ApplicationEventSubscription subscription = (ApplicationEventSubscription) o;
462:
463: if (matchMaybeEnqueue(event, subscription, evaluator,
464: ctx)) {
465: if (getLogger().isDebugEnabled())
466: getLogger().debug(
467: "Found a matching subscription");
468:
469: matches++;
470: }
471: }
472: }
473: if (getLogger().isDebugEnabled())
474: getLogger().debug(
475: "Found " + matches + " matching subscriptions");
476:
477: // If the event has not yet expired, store it for later consumption.
478: Date expiry = event.getExpiry();
479: if (expiry != null
480: && expiry.getTime() >= System.currentTimeMillis())
481: storeEvent(event, metaData);
482: }
483:
484: /**
485: * Attempts to match an event to a subscription. If successful, enqueues
486: * the pair for fulfillment and {@link EventSubscription#cancel cancels} the
487: * subscription if it has been exhausted. This method assumes that the event
488: * and subscription already match on event type (and also event keys if the
489: * subscription supplies them).
490: *
491: * @param event The event.
492: * @param subscription The subscription.
493: * @param evaluator The evaluator for this type of event.
494: * @param ctx The workflow context.
495: * @return true if the event matched the subscription.
496: * @throws EvaluatorException If an error occurs whilst evaluating any
497: * predicate specified by the subscription.
498: */
499: private boolean matchMaybeEnqueue(ApplicationEvent event,
500: ApplicationEventSubscription subscription,
501: Evaluator evaluator, WorkflowContext ctx)
502: throws EvaluatorException {
503:
504: String predicate = subscription.getPredicate();
505: boolean match = predicate == null
506: || evaluator.evaluateCondition(predicate, ctx);
507:
508: if (match) {
509: // Extract the correlation keys before possible cancellation.
510: String[] correlationKeys = subscription
511: .getCorrelationKeys();
512:
513: // If the subscription has been exhausted, cancel it.
514: if (subscription.decrementCount() == 0)
515: subscription.cancel();
516:
517: // Enqueue the matched pair for asynchronous notification.
518: getServiceManager().getAsyncManager().asyncRequest(
519: new AsyncFireApplicationEvent(event,
520: correlationKeys));
521: }
522:
523: return match;
524: }
525:
526: public final void fireApplicationEvent(ApplicationEvent event,
527: String[] correlationKeys) {
528:
529: // Inform event listeners that the subscription has been fulfilled.
530: for (Iterator iter = _listeners.iterator(); iter.hasNext();) {
531: ApplicationEventListener listener = (ApplicationEventListener) iter
532: .next();
533: listener.onEvent(event, correlationKeys);
534: }
535: }
536:
537: /**
538: * Stores an event for later consumption. This method is called when a
539: * published event is still valid according to its expiry date. Events with
540: * null expiry dates are not stored.
541: *
542: * @param event The event to store.
543: * @param metaData The meta data for this event.
544: * @see #findStoredEvents(String,Object[])
545: */
546: protected abstract void storeEvent(ApplicationEvent event,
547: EventTypeMetaData metaData) throws RepositoryException;
548:
549: /**
550: * Locates matching stored events.
551: *
552: * @param eventType The event type.
553: * @param subscriptionKeys The event keys supplied with the subscription.
554: * @return The matching stored events.
555: * @see #storeEvent(ApplicationEvent,EventTypeMetaData)
556: */
557: protected abstract Collection findStoredEvents(String eventType,
558: Object[] subscriptionKeys) throws RepositoryException;
559:
560: /**
561: * Finds application event subscriptions that match on event type and, for
562: * key-based subscriptions, event keys. Conditions defined on the
563: * subscription will be evaluated by the caller since the callee does not
564: * have sufficient information to do this.
565: *
566: * @param event The event for which matching subscriptions are to be found.
567: * @param metaData Metadata for this event.
568: * @return A collection of matching {@link EventSubscription}.
569: * @throws RepositoryException
570: */
571: protected abstract Collection findSubscriptions(
572: ApplicationEvent event, EventTypeMetaData metaData)
573: throws RepositoryException;
574:
575: public final TemporalEventSubscription subscribe(String eventType,
576: Date effective, String[] correlationKeys)
577: throws RepositoryException {
578:
579: return subscribe(eventType, effective, null, 1, null, null,
580: true, correlationKeys);
581: }
582:
583: public final ApplicationEventSubscription subscribe(
584: String eventType, Object[] eventKeys, String predicate,
585: Date effective, Date expiry, int count,
586: String[] correlationKeys, WorkflowContext ctx)
587: throws RepositoryException {
588:
589: if (supportsKeyBasedSubscriptions()) {
590: if (eventKeys == null && predicate != null
591: && !_predicateWarningIssued) {
592:
593: getLogger()
594: .warn(
595: "Predicate-based subscriptions are non-performant and "
596: + "non-scalable. Key-based subscriptions are recommended");
597: _predicateWarningIssued = true;
598: }
599: } else if (eventKeys != null) {
600: throw new UnsupportedOperationException(
601: "Key-based subscriptions are not supported.");
602: }
603:
604: if (getLogger().isDebugEnabled()) {
605: getLogger().debug(
606: "subscribe("
607: + eventType
608: + ", "
609: + (eventKeys == null ? null : Arrays
610: .asList(eventKeys)) + ", "
611: + predicate + ", " + effective + ", "
612: + expiry + ", "
613: + Arrays.asList(correlationKeys) + ')');
614: }
615:
616: ApplicationEventSubscription subscription = createSubscription(
617: eventType, eventKeys, predicate, effective, expiry,
618: count, correlationKeys);
619:
620: // Fetch any matching stored events.
621: Collection events = findStoredEvents(eventType, eventKeys);
622: if (!events.isEmpty()) {
623: EventTypeMetaData metaData = findEventTypeMetaData(eventType);
624: Evaluator evaluator = getServiceManager()
625: .getEvaluatorFactory().findEvaluator(
626: metaData.getScriptType());
627: try {
628: for (Iterator iter = events.iterator(); iter.hasNext();) {
629: ApplicationEvent event = (ApplicationEvent) iter
630: .next();
631: if (matchMaybeEnqueue(event, subscription,
632: evaluator, ctx)) {
633: if (getLogger().isDebugEnabled())
634: getLogger().debug("Found a matching event");
635: }
636: }
637: } catch (EvaluatorException e) {
638: throw new RepositoryException(e);
639: }
640: }
641:
642: return subscription;
643: }
644:
645: public final TemporalEventSubscription subscribe(String eventType,
646: Date effective, Date expiry, int count, Duration interval,
647: String calendar, boolean recoverable,
648: String[] correlationKeys) throws RepositoryException {
649:
650: if (getLogger().isDebugEnabled()) {
651: getLogger().debug(
652: "subscribe(" + eventType + ", " + effective + ", "
653: + expiry + ", " + count + ", " + interval
654: + ", " + calendar + ", "
655: + Arrays.asList(correlationKeys) + ')');
656: }
657:
658: return createSubscription(eventType, effective, expiry, count,
659: interval, calendar, recoverable, correlationKeys);
660: }
661:
662: protected abstract ApplicationEventSubscription createSubscription(
663: String eventType, Object[] eventKeys, String predicate,
664: Date effective, Date expiry, int count,
665: String[] correlationKeys) throws RepositoryException;
666:
667: protected abstract TemporalEventSubscription createSubscription(
668: String eventType, Date effective, Date expiry, int count,
669: Duration interval, String calendar, boolean recoverable,
670: String[] correlationKeys) throws RepositoryException;
671: }
|