Source Code Cross Referenced for AbstractApplicationEventBroker.java in  » Workflow-Engines » obe-1.0 » org » obe » event » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » obe 1.0 » org.obe.event 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.obe.event;
002:
003:        import org.obe.client.api.model.MIMETypes;
004:        import org.obe.client.api.repository.EventParameter;
005:        import org.obe.client.api.repository.EventTypeMetaData;
006:        import org.obe.client.api.repository.RepositoryException;
007:        import org.obe.engine.EngineContext;
008:        import org.obe.engine.WorkflowEngineUtilities;
009:        import org.obe.engine.repository.AbstractRepository;
010:        import org.obe.engine.util.Comparators;
011:        import org.obe.spi.WorkflowContext;
012:        import org.obe.spi.evaluator.Evaluator;
013:        import org.obe.spi.evaluator.EvaluatorException;
014:        import org.obe.spi.event.ApplicationEvent;
015:        import org.obe.spi.event.ApplicationEventListener;
016:        import org.obe.spi.service.ApplicationEventBroker;
017:        import org.obe.spi.service.ServerConfig;
018:        import org.obe.spi.service.ServiceManager;
019:        import org.obe.spi.util.AsyncFireApplicationEvent;
020:        import org.obe.util.CommonConfig;
021:        import org.obe.xpdl.model.condition.Condition;
022:        import org.obe.xpdl.model.misc.Duration;
023:
024:        import java.io.IOException;
025:        import java.io.InputStream;
026:        import java.io.Serializable;
027:        import java.util.*;
028:
029:        /*
030:         Problems still to solve:
031:         1. how to prevent two threads or VMs from processing the same subscriptions.
032:         ==> rely on database locking ==> SELECT FOR UPDATE, etc.
033:         2. how to ensure that an inbound event is not lost if a matching subscription
034:         has not yet been created.
035:         ==> persist the event with its key hash.
036:         */
037:
038:        /**
039:         * Abstract event broker that stores its event definitions in XML. Subclasses
040:         * provide matching services event and subscription storage.
041:         *
042:         * @author Adrian Price
043:         */
044:        public abstract class AbstractApplicationEventBroker extends
045:                AbstractRepository implements  ApplicationEventBroker {
046:
047:            private static final boolean SUPPORTS_KEY_BASED_SUBSCRIPTIONS = ServerConfig
048:                    .supportsKeyBasedEventSubscriptions();
049:
050:            protected boolean _predicateWarningIssued;
051:            private final List _listeners = new ArrayList();
052:            private final Map _mimeTypeForClass = new HashMap();
053:            private final Map _mimeClasses = new HashMap();
054:            private final Map _contentHandlers = new HashMap();
055:
056:            protected AbstractApplicationEventBroker(ServiceManager svcMgr) {
057:                super (svcMgr, EventTypeMetaData.class);
058:            }
059:
060:            public String getServiceName() {
061:                return SERVICE_NAME;
062:            }
063:
064:            public synchronized void init() throws IOException,
065:                    RepositoryException {
066:                super .init();
067:
068:                // Load the MIME type mappings.
069:                Properties mimeTypes = new Properties();
070:                InputStream in = CommonConfig
071:                        .openInputStream("Class2MimeType.properties");
072:                if (in != null) {
073:                    try {
074:                        mimeTypes.load(in);
075:                    } finally {
076:                        in.close();
077:                    }
078:                    for (Iterator iter = mimeTypes.entrySet().iterator(); iter
079:                            .hasNext();) {
080:                        Map.Entry entry = (Map.Entry) iter.next();
081:                        String className = (String) entry.getKey();
082:                        String mimeType = (String) entry.getValue();
083:                        try {
084:                            Class clazz = Class.forName(className);
085:                            _mimeTypeForClass.put(clazz, entry.getValue());
086:                        } catch (ClassNotFoundException e) {
087:                            getLogger().error(
088:                                    "Unable to load class '" + className
089:                                            + "' for MIME type '" + mimeType
090:                                            + '\'');
091:                        }
092:                    }
093:                }
094:
095:                // Load the MIME content handlers.
096:                in = CommonConfig.openInputStream("MimeHandlers.properties");
097:                if (in != null) {
098:                    try {
099:                        Properties handlers = new Properties();
100:                        Properties hints = new Properties();
101:                        handlers.load(in);
102:                        Set entries = handlers.entrySet();
103:                        for (Iterator iter = entries.iterator(); iter.hasNext();) {
104:                            Map.Entry handlerEntry = (Map.Entry) iter.next();
105:                            String key = (String) handlerEntry.getKey();
106:                            if (key.startsWith("handler.")) {
107:                                try {
108:                                    // Instantiate the content handler.
109:                                    Class clazz = Class
110:                                            .forName((String) handlerEntry
111:                                                    .getValue());
112:                                    ContentHandler handler = (ContentHandler) clazz
113:                                            .newInstance();
114:
115:                                    // Extract hints and initialize the handler.
116:                                    String contentType = key.substring(8);
117:                                    String hintPrefix = "hint." + contentType;
118:                                    hints.clear();
119:                                    for (Iterator iter2 = entries.iterator(); iter2
120:                                            .hasNext();) {
121:
122:                                        Map.Entry hintEntry = (Map.Entry) iter2
123:                                                .next();
124:
125:                                        String v = (String) hintEntry.getKey();
126:                                        if (v.startsWith(hintPrefix)) {
127:                                            hints.setProperty(v.substring(5),
128:                                                    (String) hintEntry
129:                                                            .getValue());
130:                                        }
131:                                    }
132:                                    handler.init(getServiceManager(), hints);
133:
134:                                    // If all went well, register the handler.
135:                                    _contentHandlers.put(contentType, handler);
136:                                } catch (ClassNotFoundException e) {
137:                                    getLogger().error(e);
138:                                } catch (IllegalAccessException e) {
139:                                    getLogger().error(e);
140:                                } catch (InstantiationException e) {
141:                                    getLogger().error(e);
142:                                } catch (ClassCastException e) {
143:                                    getLogger().error(e);
144:                                }
145:                            }
146:                        }
147:                    } finally {
148:                        in.close();
149:                    }
150:                }
151:            }
152:
153:            public boolean supportsKeyBasedSubscriptions() {
154:                return SUPPORTS_KEY_BASED_SUBSCRIPTIONS;
155:            }
156:
157:            public void createEventType(EventTypeMetaData eventType)
158:                    throws RepositoryException {
159:
160:                createEntry(eventType);
161:            }
162:
163:            public void deleteEventType(String eventType)
164:                    throws RepositoryException {
165:                deleteEntry(eventType);
166:            }
167:
168:            public void updateEventType(EventTypeMetaData eventType)
169:                    throws RepositoryException {
170:
171:                updateEntry(eventType.getId(), eventType);
172:            }
173:
174:            public EventTypeMetaData[] findEventTypeMetaData()
175:                    throws RepositoryException {
176:
177:                return (EventTypeMetaData[]) findMetaData();
178:            }
179:
180:            public EventTypeMetaData findEventTypeMetaData(String eventId)
181:                    throws RepositoryException {
182:
183:                return (EventTypeMetaData) findMetaData(eventId, true);
184:            }
185:
186:            public final void addApplicationEventListener(
187:                    ApplicationEventListener listener) {
188:
189:                if (_listeners.contains(listener)) {
190:                    throw new IllegalArgumentException(
191:                            "Event listener is already registered: " + listener);
192:                }
193:                if (getLogger().isDebugEnabled())
194:                    getLogger().debug(
195:                            "addApplicationEventListener(" + listener + ')');
196:
197:                _listeners.add(listener);
198:            }
199:
200:            public final void removeApplicationEventListener(
201:                    ApplicationEventListener listener) {
202:
203:                if (!_listeners.remove(listener)) {
204:                    throw new IllegalArgumentException(
205:                            "Event listener is not registered: " + listener);
206:                }
207:
208:                if (getLogger().isDebugEnabled())
209:                    getLogger().debug(
210:                            "removeApplicationEventListener(" + listener + ')');
211:            }
212:
213:            protected String getContentType(Object data, Map attrs) {
214:                // Check whether content type was specified explicitly.
215:                String contentType = (String) attrs
216:                        .get(ApplicationEvent.CONTENT_TYPE);
217:                if (contentType == null) {
218:                    // If not explicit, determine content type by introspection.
219:                    // First check to see whether we've already introspected this class.
220:                    Class clazz = data == null ? Object.class : data.getClass();
221:                    String className = clazz.getName();
222:                    synchronized (_mimeClasses) {
223:                        contentType = (String) _mimeClasses.get(className);
224:                    }
225:
226:                    // If we haven't, select the first compatible MIME type by class.
227:                    if (contentType == null) {
228:                        Iterator iter = _mimeTypeForClass.entrySet().iterator();
229:                        while (iter.hasNext()) {
230:                            Map.Entry entry = (Map.Entry) iter.next();
231:                            Class mimeClass = (Class) entry.getKey();
232:                            if (mimeClass.isAssignableFrom(clazz)) {
233:                                contentType = (String) entry.getValue();
234:                                break;
235:                            }
236:                        }
237:                        if (contentType == null)
238:                            contentType = MIMETypes.JAVA_OBJECT;
239:                        synchronized (_mimeClasses) {
240:                            _mimeClasses.put(className, contentType);
241:                        }
242:                    }
243:                }
244:
245:                return contentType;
246:            }
247:
248:            protected ContentHandler getContentHandler(String contentType) {
249:                Object handler = _contentHandlers.get(contentType);
250:                if (handler == null) {
251:                    // If contentType includes the subtype, get the base type handler.
252:                    int n = contentType.indexOf('/');
253:                    if (n != -1)
254:                        handler = _contentHandlers.get(contentType.substring(0,
255:                                n));
256:                    // If no handler is defined, use the default Java object handler.
257:                    if (handler == null)
258:                        handler = _contentHandlers.get(MIMETypes.JAVA_OBJECT);
259:                }
260:                return (ContentHandler) handler;
261:            }
262:
263:            protected Evaluator getEvaluatorFor(EventTypeMetaData metaData)
264:                    throws RepositoryException {
265:
266:                Evaluator evaluator;
267:                String scriptType = metaData.getScriptType();
268:                if (scriptType == null)
269:                    scriptType = ServerConfig.getScriptType();
270:                evaluator = _svcMgr.getEvaluatorFactory().findEvaluator(
271:                        scriptType);
272:                return evaluator;
273:            }
274:
275:            public void publish(Object source, Map attrs)
276:                    throws RepositoryException, EvaluatorException {
277:
278:                if (source == null)
279:                    throw new IllegalArgumentException("data cannot be null");
280:                if (attrs == null)
281:                    attrs = Collections.EMPTY_MAP;
282:
283:                if (getLogger().isDebugEnabled())
284:                    getLogger().debug("publish(" + source + ", " + attrs + ')');
285:
286:                // Determine the content type of the raw event data.
287:                String contentType = getContentType(source, attrs);
288:
289:                // Get the content handler for this type.
290:                ContentHandler handler = getContentHandler(contentType);
291:
292:                // If the content type doesn't include a subtype, ask the handler.
293:                int n = contentType.indexOf('/');
294:                if (n == -1)
295:                    contentType = handler.getContentType(source);
296:
297:                // Call the appropriate content handler to pre-process it and wrap the
298:                // raw event data into one or more application events.
299:                ApplicationEvent[] events = handler.process(source, attrs,
300:                        contentType);
301:
302:                // Conditional and event key expressions must be evaluated against an
303:                // EngineContext.
304:                EngineContext ctx = EngineContext.pushContext(
305:                        getServiceManager().getEngine(), null, null);
306:                try {
307:                    for (int i = 0; i < events.length; i++) {
308:                        ApplicationEvent event = events[i];
309:                        ctx.setEvent(event);
310:
311:                        contentType = event.getContentType();
312:                        String schema = event.getSchema();
313:                        String action = event.getAction();
314:
315:                        // Classify into a business event type.
316:                        String eventType = null;
317:                        EventTypeMetaData metaData = null;
318:                        Evaluator evaluator = null;
319:
320:                        // Use content type and schema to look up the event type.
321:                        EventTypeMetaData[] eventTypes = findEventTypeMetaData();
322:                        for (int j = 0; j < eventTypes.length; j++) {
323:                            metaData = eventTypes[j];
324:
325:                            // Skip this event type record if it doesn't match on
326:                            // content type, schema and action.
327:                            if (!metaData.getContentType().equals(contentType)
328:                                    || !metaData.getSchema().equals(schema)
329:                                    || !Comparators.equals(
330:                                            metaData.getAction(), action)) {
331:
332:                                continue;
333:                            }
334:
335:                            // If there's a qualifying condition, evaluate it.
336:                            Condition condition = metaData.getCondition();
337:                            if (condition != null) {
338:                                evaluator = getEvaluatorFor(metaData);
339:                                if (!evaluator.evaluateCondition(condition
340:                                        .getValue(), ctx)) {
341:
342:                                    // Condition yielded false - no match.
343:                                    evaluator = null;
344:                                    continue;
345:                                }
346:                            }
347:
348:                            eventType = metaData.getId();
349:                            event.setEventType(eventType);
350:                            break;
351:                        }
352:
353:                        // If this contentType/schema/action didn't match an event type,
354:                        // ignore it.
355:                        if (eventType == null) {
356:                            getLogger()
357:                                    .info(
358:                                            "ApplicationEvent with contentType='"
359:                                                    + contentType
360:                                                    + "', schema='"
361:                                                    + schema
362:                                                    + "' and action="
363:                                                    + action
364:                                                    + " does not match any known event type. The "
365:                                                    + "event will be discarded");
366:                            continue;
367:                        }
368:
369:                        if (getLogger().isDebugEnabled())
370:                            getLogger().debug("Found event type: " + eventType);
371:
372:                        // Make sure we have an evaluator for this event type.
373:                        if (evaluator == null)
374:                            evaluator = getEvaluatorFor(metaData);
375:
376:                        // Compute the event keys.
377:                        EventParameter[] parms = metaData.getFormalParameter();
378:                        Serializable[] eventKeys = new Serializable[parms.length];
379:                        for (int k = 0; k < parms.length; k++) {
380:                            String keyExpr = parms[k].getKeyExpression();
381:                            Object key = evaluator.evaluateExpression(keyExpr,
382:                                    ctx);
383:                            if (!(key instanceof  Serializable)) {
384:                                throw new EvaluatorException(
385:                                        "Key expression '"
386:                                                + keyExpr
387:                                                + "' for event type '"
388:                                                + eventType
389:                                                + "' does not yield a serializable value");
390:                            }
391:                            eventKeys[k] = (Serializable) key;
392:                        }
393:
394:                        if (getLogger().isDebugEnabled()) {
395:                            getLogger().debug(
396:                                    "Computed event keys: "
397:                                            + Arrays.asList(eventKeys));
398:                        }
399:                        event.setKeys(eventKeys);
400:
401:                        // Compute the event's expiration date, if applicable.
402:                        if (metaData.getTimeToLive() != null) {
403:                            Date expiry = WorkflowEngineUtilities
404:                                    .calculateExpiryDate(metaData
405:                                            .getTimeToLive(), metaData
406:                                            .getCalendar(), _svcMgr
407:                                            .getCalendarFactory());
408:                            event.setExpiry(expiry);
409:
410:                            if (getLogger().isDebugEnabled())
411:                                getLogger().debug(
412:                                        "Computed expiry date: " + expiry);
413:                        }
414:
415:                        // Forward the event to the listeners.
416:                        publish(event, metaData, evaluator, ctx);
417:                    }
418:                } finally {
419:                    EngineContext.popContext();
420:                }
421:            }
422:
423:            public final void publish(ApplicationEvent event)
424:                    throws RepositoryException, EvaluatorException {
425:
426:                EngineContext ctx = EngineContext.pushContext(
427:                        getServiceManager().getEngine(), event);
428:                try {
429:                    EventTypeMetaData metaData = findEventTypeMetaData(event
430:                            .getEventType());
431:                    Evaluator evaluator = getEvaluatorFor(metaData);
432:                    publish(event, metaData, evaluator, ctx);
433:                } finally {
434:                    EngineContext.popContext();
435:                }
436:            }
437:
438:            protected final void publish(ApplicationEvent event,
439:                    EventTypeMetaData metaData, Evaluator evaluator,
440:                    WorkflowContext ctx) throws RepositoryException,
441:                    EvaluatorException {
442:
443:                if (getLogger().isDebugEnabled())
444:                    getLogger().debug("publish(" + event + ')');
445:
446:                // Look for matching subscriptions.
447:                int matches = 0;
448:                Collection subscriptions = findSubscriptions(event, metaData);
449:                synchronized (subscriptions) {
450:                    for (Iterator iter = subscriptions.iterator(); iter
451:                            .hasNext();) {
452:                        Object o = iter.next();
453:
454:                        // Temporal events are injected directly to the
455:                        // fireApplicationEvent() method.
456:                        if (!(o instanceof  ApplicationEventSubscription))
457:                            continue;
458:
459:                        // Application events must be matched against all subscriptions
460:                        // for this event type.
461:                        ApplicationEventSubscription subscription = (ApplicationEventSubscription) o;
462:
463:                        if (matchMaybeEnqueue(event, subscription, evaluator,
464:                                ctx)) {
465:                            if (getLogger().isDebugEnabled())
466:                                getLogger().debug(
467:                                        "Found a matching subscription");
468:
469:                            matches++;
470:                        }
471:                    }
472:                }
473:                if (getLogger().isDebugEnabled())
474:                    getLogger().debug(
475:                            "Found " + matches + " matching subscriptions");
476:
477:                // If the event has not yet expired, store it for later consumption.
478:                Date expiry = event.getExpiry();
479:                if (expiry != null
480:                        && expiry.getTime() >= System.currentTimeMillis())
481:                    storeEvent(event, metaData);
482:            }
483:
484:            /**
485:             * Attempts to match an event to a subscription.  If successful, enqueues
486:             * the pair for fulfillment and {@link EventSubscription#cancel cancels} the
487:             * subscription if it has been exhausted. This method assumes that the event
488:             * and subscription already match on event type (and also event keys if the
489:             * subscription supplies them).
490:             *
491:             * @param event        The event.
492:             * @param subscription The subscription.
493:             * @param evaluator    The evaluator for this type of event.
494:             * @param ctx          The workflow context.
495:             * @return true if the event matched the subscription.
496:             * @throws EvaluatorException If an error occurs whilst evaluating any
497:             *                            predicate specified by the subscription.
498:             */
499:            private boolean matchMaybeEnqueue(ApplicationEvent event,
500:                    ApplicationEventSubscription subscription,
501:                    Evaluator evaluator, WorkflowContext ctx)
502:                    throws EvaluatorException {
503:
504:                String predicate = subscription.getPredicate();
505:                boolean match = predicate == null
506:                        || evaluator.evaluateCondition(predicate, ctx);
507:
508:                if (match) {
509:                    // Extract the correlation keys before possible cancellation.
510:                    String[] correlationKeys = subscription
511:                            .getCorrelationKeys();
512:
513:                    // If the subscription has been exhausted, cancel it.
514:                    if (subscription.decrementCount() == 0)
515:                        subscription.cancel();
516:
517:                    // Enqueue the matched pair for asynchronous notification.
518:                    getServiceManager().getAsyncManager().asyncRequest(
519:                            new AsyncFireApplicationEvent(event,
520:                                    correlationKeys));
521:                }
522:
523:                return match;
524:            }
525:
526:            public final void fireApplicationEvent(ApplicationEvent event,
527:                    String[] correlationKeys) {
528:
529:                // Inform event listeners that the subscription has been fulfilled.
530:                for (Iterator iter = _listeners.iterator(); iter.hasNext();) {
531:                    ApplicationEventListener listener = (ApplicationEventListener) iter
532:                            .next();
533:                    listener.onEvent(event, correlationKeys);
534:                }
535:            }
536:
537:            /**
538:             * Stores an event for later consumption. This method is called when a
539:             * published event is still valid according to its expiry date.  Events with
540:             * null expiry dates are not stored.
541:             *
542:             * @param event    The event to store.
543:             * @param metaData The meta data for this event.
544:             * @see #findStoredEvents(String,Object[])
545:             */
546:            protected abstract void storeEvent(ApplicationEvent event,
547:                    EventTypeMetaData metaData) throws RepositoryException;
548:
549:            /**
550:             * Locates matching stored events.
551:             *
552:             * @param eventType        The event type.
553:             * @param subscriptionKeys The event keys supplied with the subscription.
554:             * @return The matching stored events.
555:             * @see #storeEvent(ApplicationEvent,EventTypeMetaData)
556:             */
557:            protected abstract Collection findStoredEvents(String eventType,
558:                    Object[] subscriptionKeys) throws RepositoryException;
559:
560:            /**
561:             * Finds application event subscriptions that match on event type and, for
562:             * key-based subscriptions, event keys.  Conditions defined on the
563:             * subscription will be evaluated by the caller since the callee does not
564:             * have sufficient information to do this.
565:             *
566:             * @param event    The event for which matching subscriptions are to be found.
567:             * @param metaData Metadata for this event.
568:             * @return A collection of matching {@link EventSubscription}.
569:             * @throws RepositoryException
570:             */
571:            protected abstract Collection findSubscriptions(
572:                    ApplicationEvent event, EventTypeMetaData metaData)
573:                    throws RepositoryException;
574:
575:            public final TemporalEventSubscription subscribe(String eventType,
576:                    Date effective, String[] correlationKeys)
577:                    throws RepositoryException {
578:
579:                return subscribe(eventType, effective, null, 1, null, null,
580:                        true, correlationKeys);
581:            }
582:
583:            public final ApplicationEventSubscription subscribe(
584:                    String eventType, Object[] eventKeys, String predicate,
585:                    Date effective, Date expiry, int count,
586:                    String[] correlationKeys, WorkflowContext ctx)
587:                    throws RepositoryException {
588:
589:                if (supportsKeyBasedSubscriptions()) {
590:                    if (eventKeys == null && predicate != null
591:                            && !_predicateWarningIssued) {
592:
593:                        getLogger()
594:                                .warn(
595:                                        "Predicate-based subscriptions are non-performant and "
596:                                                + "non-scalable. Key-based subscriptions are recommended");
597:                        _predicateWarningIssued = true;
598:                    }
599:                } else if (eventKeys != null) {
600:                    throw new UnsupportedOperationException(
601:                            "Key-based subscriptions are not supported.");
602:                }
603:
604:                if (getLogger().isDebugEnabled()) {
605:                    getLogger().debug(
606:                            "subscribe("
607:                                    + eventType
608:                                    + ", "
609:                                    + (eventKeys == null ? null : Arrays
610:                                            .asList(eventKeys)) + ", "
611:                                    + predicate + ", " + effective + ", "
612:                                    + expiry + ", "
613:                                    + Arrays.asList(correlationKeys) + ')');
614:                }
615:
616:                ApplicationEventSubscription subscription = createSubscription(
617:                        eventType, eventKeys, predicate, effective, expiry,
618:                        count, correlationKeys);
619:
620:                // Fetch any matching stored events.
621:                Collection events = findStoredEvents(eventType, eventKeys);
622:                if (!events.isEmpty()) {
623:                    EventTypeMetaData metaData = findEventTypeMetaData(eventType);
624:                    Evaluator evaluator = getServiceManager()
625:                            .getEvaluatorFactory().findEvaluator(
626:                                    metaData.getScriptType());
627:                    try {
628:                        for (Iterator iter = events.iterator(); iter.hasNext();) {
629:                            ApplicationEvent event = (ApplicationEvent) iter
630:                                    .next();
631:                            if (matchMaybeEnqueue(event, subscription,
632:                                    evaluator, ctx)) {
633:                                if (getLogger().isDebugEnabled())
634:                                    getLogger().debug("Found a matching event");
635:                            }
636:                        }
637:                    } catch (EvaluatorException e) {
638:                        throw new RepositoryException(e);
639:                    }
640:                }
641:
642:                return subscription;
643:            }
644:
645:            public final TemporalEventSubscription subscribe(String eventType,
646:                    Date effective, Date expiry, int count, Duration interval,
647:                    String calendar, boolean recoverable,
648:                    String[] correlationKeys) throws RepositoryException {
649:
650:                if (getLogger().isDebugEnabled()) {
651:                    getLogger().debug(
652:                            "subscribe(" + eventType + ", " + effective + ", "
653:                                    + expiry + ", " + count + ", " + interval
654:                                    + ", " + calendar + ", "
655:                                    + Arrays.asList(correlationKeys) + ')');
656:                }
657:
658:                return createSubscription(eventType, effective, expiry, count,
659:                        interval, calendar, recoverable, correlationKeys);
660:            }
661:
662:            protected abstract ApplicationEventSubscription createSubscription(
663:                    String eventType, Object[] eventKeys, String predicate,
664:                    Date effective, Date expiry, int count,
665:                    String[] correlationKeys) throws RepositoryException;
666:
667:            protected abstract TemporalEventSubscription createSubscription(
668:                    String eventType, Date effective, Date expiry, int count,
669:                    Duration interval, String calendar, boolean recoverable,
670:                    String[] correlationKeys) throws RepositoryException;
671:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.