001: /*--
002:
003: Copyright (C) 2002-2005 Adrian Price.
004: All rights reserved.
005:
006: Redistribution and use in source and binary forms, with or without
007: modification, are permitted provided that the following conditions
008: are met:
009:
010: 1. Redistributions of source code must retain the above copyright
011: notice, this list of conditions, and the following disclaimer.
012:
013: 2. Redistributions in binary form must reproduce the above copyright
014: notice, this list of conditions, and the disclaimer that follows
015: these conditions in the documentation and/or other materials
016: provided with the distribution.
017:
018: 3. The names "OBE" and "Open Business Engine" must not be used to
019: endorse or promote products derived from this software without prior
020: written permission. For written permission, please contact
021: adrianprice@sourceforge.net.
022:
023: 4. Products derived from this software may not be called "OBE" or
024: "Open Business Engine", nor may "OBE" or "Open Business Engine"
025: appear in their name, without prior written permission from
026: Adrian Price (adrianprice@users.sourceforge.net).
027:
028: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
029: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
030: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
031: DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT,
032: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
033: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
034: SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
035: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
036: STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
037: IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
038: POSSIBILITY OF SUCH DAMAGE.
039:
040: For more information on OBE, please see
041: <http://obe.sourceforge.net/>.
042:
043: */
044:
045: package org.obe.event;
046:
047: import org.apache.commons.logging.Log;
048: import org.obe.client.api.model.MIMETypes;
049: import org.obe.client.api.repository.EventTypeMetaData;
050: import org.obe.client.api.repository.RepositoryException;
051: import org.obe.engine.util.Comparators;
052: import org.obe.spi.evaluator.EvaluatorException;
053: import org.obe.spi.event.ApplicationEvent;
054: import org.obe.spi.service.ServerConfig;
055: import org.obe.spi.service.ServiceManager;
056: import org.obe.spi.util.ApplicationEventUtil;
057: import org.obe.util.CommonConfig;
058: import org.obe.xpdl.model.condition.ConditionType;
059: import org.obe.xpdl.model.misc.Duration;
060: import org.w3c.dom.Document;
061:
062: import javax.activation.FileTypeMap;
063: import java.io.*;
064: import java.util.*;
065:
066: /**
067: * Provides a basic, non-persistent application event broker service.
068: *
069: * @author Adrian Price
070: */
071: public class BasicApplicationEventBroker extends
072: AbstractApplicationEventBroker {
073:
074: protected static final Log _logger = getLog(BasicApplicationEventBroker.class);
075:
076: private static final long POLL_INTERVAL = ServerConfig
077: .getDirectoryPollInterval();
078: private final Thread _pollerThread = new Thread(
079: new DirectoryPoller());
080: private final File _eventDir;
081: private final Map _subscriptionsByType = new HashMap();
082: private final Collection _storedEvents = new ArrayList();
083: private final Timer _timer = new Timer(true);
084:
085: private static class DelegatingFileTypeMap extends FileTypeMap {
086: FileTypeMap _delegate = FileTypeMap.getDefaultFileTypeMap();
087:
088: public String getContentType(File file) {
089: String filename = file.getName();
090: String ext = filename
091: .substring(filename.lastIndexOf('.') + 1);
092: return ext.equalsIgnoreCase("ser") ? MIMETypes.JAVA_OBJECT
093: : _delegate.getContentType(file);
094: }
095:
096: public String getContentType(String filename) {
097: String ext = filename
098: .substring(filename.lastIndexOf('.') + 1);
099: return ext.equalsIgnoreCase("ser") ? MIMETypes.JAVA_OBJECT
100: : _delegate.getContentType(filename);
101: }
102: }
103:
104: static {
105: FileTypeMap.setDefaultFileTypeMap(new DelegatingFileTypeMap());
106: }
107:
108: protected abstract class BasicEventSubscription implements
109: EventSubscription {
110:
111: protected final String _eventType;
112: protected final Date _effective;
113: protected final Date _expiry;
114: protected final String[] _correlationKeys;
115: private int _count;
116:
117: protected BasicEventSubscription(String eventType,
118: Date effective, Date expiry, int count,
119: String[] correlationKeys) {
120:
121: _eventType = eventType;
122: _effective = effective;
123: _expiry = expiry;
124: _count = count;
125: _correlationKeys = correlationKeys;
126: }
127:
128: public String getEventType() {
129: return _eventType;
130: }
131:
132: public Date getEffective() {
133: return _effective;
134: }
135:
136: public Date getExpiry() {
137: return _expiry;
138: }
139:
140: public synchronized int getCount() {
141: return _count;
142: }
143:
144: public synchronized int decrementCount() {
145: if (_count > 0)
146: _count--;
147: return _count;
148: }
149:
150: public final String[] getCorrelationKeys() {
151: return _correlationKeys;
152: }
153:
154: /**
155: * Cancels the subscription.
156: */
157: public void cancel() {
158: Collection subsForEventType = findSubscriptions(_eventType);
159: synchronized (subsForEventType) {
160: subsForEventType.remove(this );
161: }
162: }
163: }
164:
165: /**
166: * Implements a basic, in-memory event subscription. As a non-static
167: * inner class this object is not serializable.
168: */
169: protected class BasicApplicationEventSubscription extends
170: BasicEventSubscription implements
171: ApplicationEventSubscription {
172:
173: private final Object[] _eventKeys;
174: private final String _predicate;
175: private final int _conditionType;
176: private TimerTask _expiryTimerTask;
177:
178: BasicApplicationEventSubscription(String eventType,
179: Object[] eventKeys, String predicate, Date effective,
180: Date expiry, int count, String[] correlationKeys) {
181:
182: super (eventType, effective, expiry, count, correlationKeys);
183:
184: ApplicationEventUtil.validateSubscription(eventType,
185: effective, expiry, count, correlationKeys);
186:
187: _eventKeys = ApplicationEventUtil.toStrings(eventKeys);
188: _predicate = predicate;
189: // TODO: specifying condition type here doesn't seem right...
190: _conditionType = ConditionType.CONDITION_INT;
191:
192: // If this application event subscription has an expiry date,
193: // schedule a timed event to remove the subscription at that point.
194: if (expiry != null) {
195: _expiryTimerTask = new TimerTask() {
196: public void run() {
197: super .cancel();
198: }
199: };
200: _timer.schedule(_expiryTimerTask, expiry);
201: }
202: }
203:
204: public Object[] getEventKeys() {
205: return _eventKeys;
206: }
207:
208: public String getPredicate() {
209: return _predicate;
210: }
211:
212: public int getConditionType() {
213: return _conditionType;
214: }
215:
216: public void cancel() {
217: super .cancel();
218: if (_expiryTimerTask != null) {
219: _expiryTimerTask.cancel();
220: _expiryTimerTask = null;
221: }
222: }
223: }
224:
225: protected class BasicTemporalEventSubscription extends
226: BasicEventSubscription implements TemporalEventSubscription {
227:
228: private final Duration _interval;
229: private final String _calendar;
230: private final boolean _recoverable;
231: private final TimerTask _timerTask;
232:
233: BasicTemporalEventSubscription(String eventType,
234: Date effective, Date expiry, int count,
235: Duration interval, String calendar,
236: boolean recoverable, String[] correlationKeys) {
237:
238: super (eventType, effective, expiry, count, correlationKeys);
239:
240: ApplicationEventUtil
241: .validateSubscription(eventType, effective, expiry,
242: count, interval, correlationKeys);
243:
244: _interval = interval;
245: _calendar = calendar;
246: _recoverable = recoverable;
247: _timerTask = new TimerTask() {
248: public void run() {
249: Date next = ApplicationEventUtil.handleTimeout(
250: BasicTemporalEventSubscription.this ,
251: new Date(scheduledExecutionTime()),
252: getServiceManager());
253: if (next != null) {
254: _timer.schedule(this , next);
255: }
256: }
257: };
258: _timer.schedule(_timerTask, effective);
259: }
260:
261: public Duration getInterval() {
262: return _interval;
263: }
264:
265: public String getCalendar() {
266: return _calendar;
267: }
268:
269: public boolean isRecoverable() {
270: return _recoverable;
271: }
272:
273: public void cancel() {
274: super .cancel();
275: _timerTask.cancel();
276: }
277: }
278:
279: private class DirectoryPoller implements Runnable {
280: public void run() {
281: _logger.info("Polling event directory every "
282: + POLL_INTERVAL + " milliseconds");
283:
284: try {
285: do {
286: try {
287: Thread.sleep(POLL_INTERVAL);
288: scanEventDirectory();
289: } catch (InterruptedException e) {
290: // Just wake up and look for work.
291: } catch (Exception e) {
292: _logger.error("Error scanning event directory",
293: e);
294: }
295: } while (isInitialized());
296: } finally {
297: _logger.info("event directory polling stopped");
298: }
299: }
300: }
301:
302: public BasicApplicationEventBroker(ServiceManager svcMgr) {
303: super (svcMgr);
304: _eventDir = new File(CommonConfig.getConfigDir(), "events");
305: _pollerThread.setName("EventDirectoryPoller");
306: _pollerThread.setDaemon(true);
307:
308: }
309:
310: protected Log getLogger() {
311: return _logger;
312: }
313:
314: public synchronized void init() throws IOException,
315: RepositoryException {
316: super .init();
317:
318: // Check and if necessary create the config/event directory.
319: if (!_eventDir.exists())
320: _eventDir.mkdir();
321: else if (!_eventDir.isDirectory()) {
322: throw new RepositoryException(_eventDir
323: + " is not a directory");
324: } else if (!_eventDir.canRead()) {
325: throw new RepositoryException("Cannot read from directory "
326: + _eventDir);
327: } else if (!_eventDir.canWrite()) {
328: throw new RepositoryException("Cannot write to directory "
329: + _eventDir);
330: }
331:
332: if (POLL_INTERVAL > 0) {
333: _pollerThread.start();
334: _logger.info("Scanning directory " + _eventDir
335: + " for event files");
336: }
337: }
338:
339: public synchronized void exit() {
340: super .exit();
341: _pollerThread.interrupt();
342: }
343:
344: private void scanEventDirectory() throws EvaluatorException,
345: RepositoryException {
346:
347: File[] eventFiles = _eventDir.listFiles();
348: if (eventFiles == null) {
349: _logger.warn("Event directory no longer exists: "
350: + _eventDir);
351: return;
352: }
353: for (int i = 0; i < eventFiles.length; i++) {
354: File eventFile = eventFiles[i];
355: if (!eventFile.isFile()) {
356: _logger.info("Ignoring non-file entry: "
357: + eventFile.getName());
358: continue;
359: }
360:
361: FileInputStream in = null;
362: try {
363: // Convert the file to the Java type that is appropriate for its
364: // MIME content type. This data conversion requirement serves to
365: // illustrate that the DataConverter and ApplicationEventBroker
366: // services are closely linked in terms of their need to convert
367: // data between types.
368: // TODO: integrate ApplicationEvent content type recognition
369: // with the DataConverter service.
370: in = new FileInputStream(eventFile);
371: String contentType = FileTypeMap
372: .getDefaultFileTypeMap().getContentType(
373: eventFile);
374: Object source;
375: if (contentType.equals(MIMETypes.JAVA_OBJECT)) {
376: ObjectInputStream ois = new ObjectInputStream(in);
377: source = ois.readObject();
378: ois.close();
379: } else if (contentType.equals(MIMETypes.XML)) {
380: source = getServiceManager().getDataConverter()
381: .convertValue(eventFile, Document.class);
382: } else if (contentType.startsWith("text/")) {
383: source = getServiceManager().getDataConverter()
384: .convertValue(eventFile, String.class);
385: } else {
386: source = getServiceManager().getDataConverter()
387: .convertValue(eventFile, byte[].class);
388: }
389:
390: Map attributes = new HashMap();
391: attributes.put(ApplicationEvent.CONTENT_TYPE,
392: contentType);
393: publish(source, attributes);
394: } catch (FileNotFoundException e) {
395: // This probably means someone else deleted the file.
396: _logger
397: .warn("Event file has disappeared: "
398: + eventFile);
399: } catch (IOException e) {
400: throw new RepositoryException(e);
401: } catch (ClassNotFoundException e) {
402: throw new RepositoryException(e);
403: } finally {
404: if (in != null) {
405: try {
406: in.close();
407: } catch (IOException e) {
408: _logger.error(e);
409: }
410: }
411: eventFile.delete();
412: }
413: }
414: }
415:
416: protected void storeEvent(ApplicationEvent event,
417: EventTypeMetaData metaData) {
418:
419: synchronized (_storedEvents) {
420: _storedEvents.add(event);
421: }
422: }
423:
424: protected Collection findStoredEvents(String eventType,
425: Object[] subscriptionKeys) {
426:
427: synchronized (_storedEvents) {
428: Collection events = new ArrayList();
429: for (Iterator iter = _storedEvents.iterator(); iter
430: .hasNext();) {
431: ApplicationEvent event = (ApplicationEvent) iter.next();
432: if (event.getEventType().equals(eventType)
433: && Comparators.match(subscriptionKeys, event
434: .getKeys(), false)) {
435:
436: events.add(event);
437: }
438: }
439: return events;
440: }
441: }
442:
443: protected ApplicationEventSubscription createSubscription(
444: String eventType, Object[] eventKeys, String predicate,
445: Date effective, Date expiry, int count,
446: String[] correlationKeys) throws RepositoryException {
447:
448: ApplicationEventSubscription subscription = new BasicApplicationEventSubscription(
449: eventType, eventKeys, predicate, effective, expiry,
450: count, correlationKeys);
451:
452: // We must add the new subscription here because to do so in the
453: // constructor would violate OO design patterns. (Call to overrideable
454: // method from incompletely constructed object.)
455: registerSubscription(eventType, subscription);
456:
457: return subscription;
458: }
459:
460: private void registerSubscription(String eventType,
461: EventSubscription subscription) {
462:
463: Collection subscriptions = findSubscriptions(eventType);
464: synchronized (subscriptions) {
465: subscriptions.add(subscription);
466: }
467: }
468:
469: protected TemporalEventSubscription createSubscription(
470: String eventType, Date effective, Date expiry, int count,
471: Duration interval, String calendar, boolean recoverable,
472: String[] correlationKeys) throws RepositoryException {
473:
474: TemporalEventSubscription subscription = new BasicTemporalEventSubscription(
475: eventType, effective, expiry, count, interval,
476: calendar, recoverable, correlationKeys);
477:
478: // We must add the new subscription here because to do so in the
479: // constructor would violate OO design patterns. (Call to overrideable
480: // method from incompletely constructed object.)
481: registerSubscription(eventType, subscription);
482:
483: return subscription;
484: }
485:
486: public void unsubscribe(String[] correlationKeys, boolean exact)
487: throws RepositoryException {
488:
489: if (getLogger().isDebugEnabled()) {
490: getLogger().debug(
491: "unsubscribe(" + Arrays.asList(correlationKeys)
492: + ')');
493: }
494:
495: List matches = null;
496: Collection[] allSubscriptions = findAllSubscriptions(null);
497: for (int i = 0; i < allSubscriptions.length; i++) {
498: Collection subscrips = allSubscriptions[i];
499: synchronized (subscrips) {
500: for (Iterator iter = subscrips.iterator(); iter
501: .hasNext();) {
502: BasicEventSubscription subscrip = (BasicEventSubscription) iter
503: .next();
504:
505: if (Comparators.match(correlationKeys, subscrip
506: .getCorrelationKeys(), exact)) {
507:
508: if (matches == null) {
509: matches = new ArrayList();
510: }
511: matches.add(subscrip);
512: }
513: }
514: }
515: // Cancel the subscriptions individually to avoid the possibility of
516: // deadlock caused by external synchronization.
517: if (matches != null) {
518: for (int j = 0; j < matches.size(); j++) {
519: ((EventSubscription) matches.get(j)).cancel();
520: }
521: matches.clear();
522: }
523: }
524: }
525:
526: protected Collection findSubscriptions(ApplicationEvent event,
527: EventTypeMetaData metaData) {
528:
529: long now = System.currentTimeMillis();
530: Collection subscriptions = findSubscriptions(event
531: .getEventType());
532: Collection filtered = new ArrayList(subscriptions.size());
533: synchronized (subscriptions) {
534: for (Iterator iter = subscriptions.iterator(); iter
535: .hasNext();) {
536: Object o = iter.next();
537: if (o instanceof ApplicationEventSubscription) {
538: ApplicationEventSubscription candidate = (ApplicationEventSubscription) o;
539: Date effective = candidate.getEffective();
540: Date expiry = candidate.getExpiry();
541: if ((effective == null || effective.getTime() <= now)
542: && (expiry == null || expiry.getTime() > now)
543: && Comparators.match(candidate
544: .getEventKeys(), event.getKeys(),
545: false)) {
546:
547: filtered.add(candidate);
548: }
549: }
550: }
551: }
552: return filtered;
553: }
554:
555: protected Collection findSubscriptions(String eventType) {
556: synchronized (_subscriptionsByType) {
557: List subscrips = (List) _subscriptionsByType.get(eventType);
558: if (subscrips == null) {
559: subscrips = new ArrayList();
560: _subscriptionsByType.put(eventType, subscrips);
561: }
562: return subscrips;
563: }
564: }
565:
566: protected Collection[] findAllSubscriptions(String eventType) {
567: synchronized (_subscriptionsByType) {
568: Collection[] subscrips;
569: if (eventType != null) {
570: List list = (List) _subscriptionsByType.get(eventType);
571: if (list == null) {
572: list = new ArrayList();
573: _subscriptionsByType.put(eventType, list);
574: }
575: subscrips = new Collection[] { list };
576: } else {
577: subscrips = (Collection[]) _subscriptionsByType
578: .values().toArray(
579: new Collection[_subscriptionsByType
580: .size()]);
581: }
582: return subscrips;
583: }
584: }
585: }
|