0001: /*
0002: * This file is part of the WfMOpen project.
0003: * Copyright (C) 2001-2003 Danet GmbH (www.danet.de), GS-AN.
0004: * All rights reserved.
0005: *
0006: * This program is free software; you can redistribute it and/or modify
0007: * it under the terms of the GNU General Public License as published by
0008: * the Free Software Foundation; either version 2 of the License, or
0009: * (at your option) any later version.
0010: *
0011: * This program is distributed in the hope that it will be useful,
0012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0014: * GNU General Public License for more details.
0015: *
0016: * You should have received a copy of the GNU General Public License
0017: * along with this program; if not, write to the Free Software
0018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0019: *
0020: * $Id: StandardWorkflowService.java,v 1.13.2.1 2007/11/02 16:00:33 drmlipp Exp $
0021: *
0022: * $Log: StandardWorkflowService.java,v $
0023: * Revision 1.13.2.1 2007/11/02 16:00:33 drmlipp
0024: * Merged bug fixes from HEAD.
0025: *
0026: * Revision 1.14 2007/10/07 18:40:10 mlipp
0027: * Removed superfluous imports.
0028: *
0029: * Revision 1.13 2007/02/27 14:34:20 drmlipp
0030: * Some refactoring to reduce cyclic dependencies.
0031: *
0032: * Revision 1.12 2007/02/21 21:32:29 mlipp
0033: * Using pooled JMS connections when in EJB now.
0034: *
0035: * Revision 1.11 2007/02/17 21:22:56 mlipp
0036: * Improved service caching and topic connection handling.
0037: *
0038: * Revision 1.10 2007/02/16 21:43:23 mlipp
0039: * Improved.
0040: *
0041: * Revision 1.9 2007/02/15 14:28:29 drmlipp
0042: * Even more improvements.
0043: *
0044: * Revision 1.8 2007/02/15 14:09:45 drmlipp
0045: * Improved resource releasing.
0046: *
0047: * Revision 1.7 2007/02/15 13:52:37 drmlipp
0048: * Fixed channel release problem.
0049: *
0050: * Revision 1.6 2006/09/29 12:32:09 drmlipp
0051: * Consistently using WfMOpen as projct name now.
0052: *
0053: * Revision 1.5 2006/09/21 14:20:42 drmlipp
0054: * New method for retrieving current user.
0055: *
0056: * Revision 1.4 2005/08/17 21:22:20 mlipp
0057: * Removed deprecated method (as announced in 1.3).
0058: *
0059: * Revision 1.3 2005/08/17 21:15:31 mlipp
0060: * Synchronized with 1.3.1p3.
0061: *
0062: * Revision 1.1.1.1.6.3 2005/08/17 20:39:12 drmlipp
0063: * Removed usage of RAS on client side.
0064: *
0065: * Revision 1.2 2005/04/08 11:28:05 drmlipp
0066: * Merged changes from 1.3 branch up to 1.3p6.
0067: *
0068: * Revision 1.1.1.1.6.2 2005/04/07 12:13:03 drmlipp
0069: * Added event subscriber with filter.
0070: *
0071: * Revision 1.1.1.1.6.1 2005/04/06 15:42:06 drmlipp
0072: * Added additional support for accessing the event queue to
0073: * WorkflowService.
0074: *
0075: * Revision 1.1.1.1 2004/08/18 15:17:38 drmlipp
0076: * Update to 1.2
0077: *
0078: * Revision 1.5 2004/07/11 19:56:51 lipp
0079: * Adapted to JOnAS 4.1.2 naming scheme.
0080: *
0081: * Revision 1.4 2004/07/08 14:42:30 lipp
0082: * Providing support for separated connection factories.
0083: *
0084: * Revision 1.3 2004/07/06 11:45:20 lipp
0085: * Asure usability in various application servers.
0086: *
0087: * Revision 1.2 2004/06/14 19:37:20 lipp
0088: * Fixed assignment functions and cleaned up assignment related
0089: * interfaces.
0090: *
0091: * Revision 1.1 2004/02/21 21:31:00 lipp
0092: * Some more refactoring to resolve cyclic dependencies.
0093: *
0094: * Revision 1.28 2004/02/12 13:10:38 lipp
0095: * Renamed openChannel to getChannel (channels have no open state).
0096: *
0097: * Revision 1.27 2004/01/30 14:36:30 lipp
0098: * Partial implementation of message receipt.
0099: *
0100: * Revision 1.26 2004/01/28 16:11:38 lipp
0101: * Re-implementation of chabacc, Sender working.
0102: *
0103: * Revision 1.25 2004/01/23 12:49:26 lipp
0104: * Fixes to WorkflowService[Factory] implementation/documentation.
0105: *
0106: * Revision 1.24 2004/01/22 16:10:50 lipp
0107: * Channel message queue not available yet.
0108: *
0109: * Revision 1.23 2004/01/22 15:06:09 lipp
0110: * Clarified serializability of workflow service.
0111: *
0112: * Revision 1.22 2003/11/21 14:53:50 lipp
0113: * Create topic connection on client side, support initial context
0114: * override.
0115: *
0116: * Revision 1.21 2003/10/06 15:20:27 lipp
0117: * Made doFinish available in WorkflowService.
0118: *
0119: * Revision 1.20 2003/06/27 08:51:45 lipp
0120: * Fixed copyright/license information.
0121: *
0122: * Revision 1.19 2003/06/01 20:58:50 lipp
0123: * Moved toSAX to batch.
0124: *
0125: * Revision 1.18 2003/05/02 15:28:29 lipp
0126: * Resolved some more package dependencies.
0127: *
0128: * Revision 1.17 2003/05/02 14:55:58 lipp
0129: * Resolved some more package dependencies.
0130: *
0131: * Revision 1.16 2003/04/26 16:11:14 lipp
0132: * Moved some classes to reduce package dependencies.
0133: *
0134: * Revision 1.15 2003/04/22 16:35:25 lipp
0135: * Made SAXEventBuffer outer class.
0136: *
0137: * Revision 1.14 2003/03/31 16:50:28 huaiyang
0138: * Logging using common-logging.
0139: *
0140: * Revision 1.13 2003/02/25 17:08:05 lipp
0141: * Reorganized requester implementation.
0142: *
0143: * Revision 1.12 2003/02/11 08:50:41 lipp
0144: * Updated comment.
0145: *
0146: * Revision 1.11 2003/02/07 15:56:19 lipp
0147: * Implemented Requester notifications.
0148: *
0149: * Revision 1.10 2003/02/06 12:47:14 lipp
0150: * Implemented Requester (no event handling yet).
0151: *
0152: * Revision 1.9 2003/02/05 15:57:06 lipp
0153: * Replaced DummyRequester with DefaultRequester.
0154: *
0155: * Revision 1.8 2002/12/19 21:37:42 lipp
0156: * Reorganized interfaces.
0157: *
0158: * Revision 1.7 2002/12/19 16:23:46 lipp
0159: * Resolved illegal dependency between apis and danet.an.util.
0160: *
0161: * Revision 1.6 2002/12/10 11:21:05 lipp
0162: * Added batch processing as "generic DTO".
0163: *
0164: * Revision 1.5 2002/11/22 09:56:15 lipp
0165: * Clarified usage of the danet utility bean for user preferences.
0166: *
0167: * Revision 1.4 2002/09/19 14:37:37 lipp
0168: * Using WorkflowService.release now and optimized process definition
0169: * storage.
0170: *
0171: * Revision 1.3 2002/09/18 21:26:51 lipp
0172: * Removed SAXFacade (integrated with WorkflowEngine).
0173: *
0174: * Revision 1.2 2002/09/18 20:48:21 lipp
0175: * Cleanly separated workflow engine and service.
0176: *
0177: * Revision 1.1 2002/09/18 14:29:44 lipp
0178: * Partial workflow service implementation added.
0179: *
0180: */
0181: package de.danet.an.workflow.ejbs.client;
0182:
0183: import java.io.IOException;
0184:
0185: import java.util.ArrayList;
0186: import java.util.Collection;
0187: import java.util.HashMap;
0188: import java.util.HashSet;
0189: import java.util.Iterator;
0190: import java.util.Map;
0191: import java.util.Set;
0192: import java.util.StringTokenizer;
0193:
0194: import java.lang.ref.Reference;
0195: import java.lang.ref.ReferenceQueue;
0196: import java.lang.ref.WeakReference;
0197: import java.lang.reflect.InvocationTargetException;
0198: import java.rmi.RemoteException;
0199: import java.security.Principal;
0200:
0201: import javax.jms.ConnectionConsumer;
0202: import javax.jms.ConnectionFactory;
0203: import javax.jms.ConnectionMetaData;
0204: import javax.jms.Destination;
0205: import javax.jms.ExceptionListener;
0206: import javax.jms.JMSException;
0207: import javax.jms.Message;
0208: import javax.jms.MessageListener;
0209: import javax.jms.ObjectMessage;
0210: import javax.jms.ServerSessionPool;
0211: import javax.jms.Session;
0212: import javax.jms.Topic;
0213: import javax.jms.TopicConnection;
0214: import javax.jms.TopicConnectionFactory;
0215: import javax.jms.TopicSession;
0216: import javax.jms.TopicSubscriber;
0217: import javax.naming.Context;
0218: import javax.naming.NamingException;
0219:
0220: import de.danet.an.util.EJBUtil;
0221:
0222: import de.danet.an.workflow.omgcore.CannotCompleteException;
0223: import de.danet.an.workflow.omgcore.InvalidDataException;
0224: import de.danet.an.workflow.omgcore.InvalidPerformerException;
0225: import de.danet.an.workflow.omgcore.ProcessData;
0226: import de.danet.an.workflow.omgcore.WfActivity;
0227: import de.danet.an.workflow.omgcore.WfAuditEvent;
0228: import de.danet.an.workflow.omgcore.WfAuditHandler;
0229: import de.danet.an.workflow.omgcore.WfObject;
0230: import de.danet.an.workflow.omgcore.WfProcess;
0231: import de.danet.an.workflow.omgcore.WfRequester;
0232: import de.danet.an.workflow.omgcore.WfResource;
0233:
0234: import de.danet.an.workflow.api.Batch;
0235: import de.danet.an.workflow.api.Channel;
0236: import de.danet.an.workflow.api.Configuration;
0237: import de.danet.an.workflow.api.EventSubscriber;
0238: import de.danet.an.workflow.api.InvalidKeyException;
0239: import de.danet.an.workflow.api.Process;
0240: import de.danet.an.workflow.api.ProcessDefinitionDirectory;
0241: import de.danet.an.workflow.api.ProcessDirectory;
0242: import de.danet.an.workflow.api.WorkflowService;
0243:
0244: import de.danet.an.workflow.apix.ExtProcessDirectory;
0245: import de.danet.an.workflow.ejbs.WorkflowEngine;
0246:
0247: /**
0248: * This class provides the implementation of
0249: * {@link WorkflowService <code>WorkflowService</code>}.<P>
0250: *
0251: * If loglevel is <code>DEBUG</code>, every event subscriber created
0252: * with {@link #createEventSubscriber
0253: * <code>createEventReceiver</code>} logs the received events.
0254: *
0255: * @author <a href="mailto:lipp@danet.de"></a>
0256: * @version $Revision: 1.13.2.1 $
0257: */
0258:
0259: public class StandardWorkflowService implements WorkflowService,
0260: WfAuditHandler {
0261:
0262: private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
0263: .getLog(StandardWorkflowService.class);
0264:
0265: private Map serviceProperties = null;
0266:
0267: private Context initialContext = null;
0268:
0269: private Boolean jmsConnectionReusable = null;
0270: private TopicConnectionFactory topConFacCache = null;
0271: private TopicConnection topConUnwrappedCache = null;
0272: private TopicConnection topConCache = null;
0273: private Thread connectionCleanupThread = null;
0274: private Topic eventService = null;
0275: private Topic channelMessageOutTopic = null;
0276:
0277: private EventSubscriber reqEvtRec = null;
0278: private Map reqsByProcKey = null;
0279: private Map procKeysByReq = null;
0280: private Collection ignoredProcs = null;
0281:
0282: private ReferenceQueue reqRefQueue = null;
0283: private Thread reqRefCleaner = null;
0284:
0285: private WorkflowEngine engine = null;
0286:
0287: /**
0288: * Creates an instance of <code>StandardWorkflowService</code>
0289: * with all attributes initialized to default values.
0290: *
0291: * @param props the service properties
0292: * @param ic the initial context to be used for lookup of server
0293: * resources
0294: * @param e the workflow engine
0295: */
0296: public StandardWorkflowService(Map props, Context ic,
0297: WorkflowEngine e) {
0298: serviceProperties = props;
0299: initialContext = ic;
0300: engine = e;
0301: }
0302:
0303: private void initJmsSetup() throws RemoteException, JMSException {
0304: if (topConFacCache != null) {
0305: return;
0306: }
0307: Object[] esd = engine.eventServiceData();
0308: try {
0309: // This is a hack, but if anybody knowns a better way to
0310: // adapt to the application server used, please tell
0311: // me. This will eventually go away when JBoss gets a
0312: // client container, as we have application server
0313: // specific binding on the client side then.
0314: String conFacName = (String) esd[0];
0315: String topicConFacName = (String) esd[2];
0316:
0317: ConnectionFactory jbossInternalConFac = null;
0318: try {
0319: jbossInternalConFac = (ConnectionFactory) initialContext
0320: .lookup("java:/JmsXA");
0321: } catch (NamingException e) {
0322: // deliberately ignored
0323: }
0324: if (jbossInternalConFac == null) {
0325: try {
0326: jbossInternalConFac = (ConnectionFactory) initialContext
0327: .lookup("java:XAConnectionFactory");
0328: } catch (NamingException e) {
0329: // deliberately ignored
0330: }
0331: }
0332: // Comparison to "null" is workaround for JOnAS bug #300555
0333: if (jbossInternalConFac != null) {
0334: topConFacCache = (TopicConnectionFactory) jbossInternalConFac;
0335: } else if (conFacName != null && !conFacName.equals("null")) {
0336: // Use this if your AS provides a factory that
0337: // implements both Queue- and TopicConnectionFactory
0338: topConFacCache = (TopicConnectionFactory) initialContext
0339: .lookup(conFacName);
0340: } else {
0341: if (topicConFacName == null
0342: || topicConFacName.equals("null")) {
0343: if (engine
0344: .getClass()
0345: .getName()
0346: .startsWith(
0347: "de.danet.an.workflow.ejbs.JOnASWorkflowEngine")) {
0348: topicConFacName = "JTCF";
0349: } else {
0350: // popular default
0351: topicConFacName = "ConnectionFactory";
0352: }
0353: }
0354: topConFacCache = (TopicConnectionFactory) initialContext
0355: .lookup(topicConFacName);
0356: }
0357: eventService = (Topic) initialContext
0358: .lookup((String) esd[3]);
0359: channelMessageOutTopic = (Topic) initialContext
0360: .lookup((String) esd[4]);
0361: } catch (NamingException e) {
0362: logger.error(e.getMessage(), e);
0363: throw new IllegalStateException(e.getMessage());
0364: }
0365: }
0366:
0367: /**
0368: * Return a started topic connection.
0369: * @return the connection
0370: * @throws RemoteException if a system-level error occurs
0371: * @throws JMSException if the connection cannot be created
0372: */
0373: TopicConnection topicConnection() throws RemoteException,
0374: JMSException {
0375: synchronized (this ) {
0376: if (topConCache != null) {
0377: return topConCache;
0378: }
0379: if (jmsConnectionReusable == null) {
0380: initJmsSetup();
0381: // Crude, but currently the only known way to find out
0382: // whether we're running in server or client container.
0383: // And we need to know because of J2EE 1.4 spec, J2EE.6.6.
0384: TopicConnection topCon = topConFacCache
0385: .createTopicConnection();
0386: topCon.start();
0387: jmsConnectionReusable = Boolean.FALSE;
0388: TopicSession ts1 = null;
0389: TopicSession ts2 = null;
0390: try {
0391: ts1 = topCon.createTopicSession(false,
0392: Session.AUTO_ACKNOWLEDGE);
0393: ts2 = topCon.createTopicSession(false,
0394: Session.AUTO_ACKNOWLEDGE);
0395: jmsConnectionReusable = Boolean.TRUE;
0396: } catch (JMSException e) {
0397: // the created topic connection may still be handed out
0398: return topCon;
0399: } finally {
0400: if (ts1 != null) {
0401: ts1.close();
0402: }
0403: if (ts2 != null) {
0404: ts2.close();
0405: }
0406: }
0407: topConUnwrappedCache = topCon;
0408: topConCache = new TopicConnectionWrapper(topCon);
0409: connectionCleanupThread = new Thread() {
0410: public void run() {
0411: if (topConUnwrappedCache != null) {
0412: try {
0413: topConUnwrappedCache.close();
0414: } catch (JMSException e) {
0415: logger.warn("Cannot close - ignored: "
0416: + e.getMessage(), e);
0417: }
0418: topConCache = null;
0419: topConUnwrappedCache = null;
0420: }
0421: }
0422: };
0423: Runtime.getRuntime().addShutdownHook(
0424: connectionCleanupThread);
0425: return topConCache;
0426: }
0427: }
0428: TopicConnection topCon = ((TopicConnectionFactory) topConFacCache)
0429: .createTopicConnection();
0430: topCon.start();
0431: return topCon;
0432: }
0433:
0434: private class EventSubscriberImpl implements EventSubscriber {
0435:
0436: private TopicConnection connection;
0437: private TopicSession session;
0438: private TopicSubscriber subs;
0439:
0440: public EventSubscriberImpl(String processKey, String eventTypes)
0441: throws IOException {
0442: try {
0443: initJmsSetup();
0444: connection = topicConnection();
0445: session = connection.createTopicSession(false,
0446: Session.AUTO_ACKNOWLEDGE);
0447: if (processKey == null && eventTypes == null) {
0448: subs = session.createSubscriber(eventService);
0449: return;
0450: }
0451: StringBuffer filter = new StringBuffer();
0452: if (processKey != null) {
0453: filter.append("processKey = '" + processKey + "'");
0454: }
0455: if (eventTypes != null) {
0456: if (filter.length() > 0) {
0457: filter.append(" AND ");
0458: }
0459: filter.append("(");
0460: StringTokenizer st = new StringTokenizer(
0461: eventTypes, " \t\n\r\f,;");
0462: boolean first = true;
0463: while (st.hasMoreTokens()) {
0464: if (first) {
0465: first = false;
0466: } else {
0467: filter.append(" OR ");
0468: }
0469: filter.append("(eventType = '");
0470: filter.append(st.nextToken());
0471: filter.append("')");
0472: }
0473: if (first) {
0474: throw new IllegalArgumentException("\""
0475: + eventTypes + "\" is not a"
0476: + " valid list of event types.");
0477: }
0478: filter.append(")");
0479: }
0480: subs = session.createSubscriber(eventService, filter
0481: .toString(), true);
0482: if (logger.isDebugEnabled()) {
0483: logger.debug("Created " + this + " with filter \""
0484: + filter.toString() + "\"");
0485: }
0486: } catch (JMSException e) {
0487: throw (IOException) (new IOException(e.getMessage()))
0488: .initCause(e);
0489: }
0490: }
0491:
0492: public void close() {
0493: try {
0494: subs.close();
0495: session.close();
0496: connection.close();
0497: } catch (JMSException e) {
0498: logger.error("Closing topic: " + e.getMessage(), e);
0499: }
0500: subs = null;
0501: session = null;
0502: connection = null;
0503: }
0504:
0505: public WfAuditEvent receive() throws IOException {
0506: try {
0507: Object e = ((ObjectMessage) subs.receive()).getObject();
0508: if (logger.isDebugEnabled()) {
0509: logger.debug("EventSubscriber " + this
0510: + " received " + e);
0511: }
0512: return (WfAuditEvent) e;
0513: } catch (JMSException e) {
0514: throw (IOException) (new IOException(e.getMessage()))
0515: .initCause(e);
0516: }
0517: }
0518:
0519: public WfAuditEvent receive(long timeout) throws IOException {
0520: try {
0521: Object e = subs.receive(timeout);
0522: if (e == null) {
0523: return null;
0524: }
0525: e = ((ObjectMessage) e).getObject();
0526: if (logger.isDebugEnabled()) {
0527: logger.debug("EventSubscriber " + this
0528: + " received " + e);
0529: }
0530: return (WfAuditEvent) e;
0531: } catch (JMSException e) {
0532: throw (IOException) (new IOException(e.getMessage()))
0533: .initCause(e);
0534: }
0535: }
0536:
0537: public WfAuditEvent receiveNoWait() throws IOException {
0538: try {
0539: Object e = ((ObjectMessage) subs.receiveNoWait())
0540: .getObject();
0541: if (e == null) {
0542: return null;
0543: }
0544: if (logger.isDebugEnabled()) {
0545: logger.debug("EventSubscriber " + this
0546: + " received " + e);
0547: }
0548: return (WfAuditEvent) e;
0549: } catch (JMSException e) {
0550: throw (IOException) (new IOException(e.getMessage()))
0551: .initCause(e);
0552: }
0553: }
0554:
0555: public void setEventHandler(WfAuditHandler hndlr)
0556: throws IOException {
0557: final WfAuditHandler handler = hndlr;
0558: try {
0559: subs.setMessageListener(new MessageListener() {
0560: public void onMessage(Message msg) {
0561: Object e = null;
0562: try {
0563: e = ((ObjectMessage) msg).getObject();
0564: if (logger.isDebugEnabled()) {
0565: logger
0566: .debug("EventSubscriber "
0567: + this
0568: + " received "
0569: + e
0570: + " with processKey="
0571: + msg
0572: .getStringProperty("processKey")
0573: + " and eventType="
0574: + msg
0575: .getStringProperty("eventType"));
0576: }
0577: handler.receiveEvent((WfAuditEvent) e);
0578: } catch (JMSException ex) {
0579: logger.error(ex.getMessage(), ex);
0580: } catch (InvalidPerformerException ex) {
0581: // deliberatly ignored.
0582: } catch (RemoteException ex) {
0583: // deliberatly ignored.
0584: } catch (Exception ex) {
0585: logger.error(this + "cannot handle " + e
0586: + ": " + ex.getMessage(), ex);
0587: }
0588: }
0589: });
0590: } catch (JMSException e) {
0591: throw (IOException) (new IOException(e.getMessage()))
0592: .initCause(e);
0593: }
0594: }
0595: }
0596:
0597: /* Comment copied from interface. */
0598: public Map serviceProperties() throws RemoteException {
0599: return serviceProperties;
0600: }
0601:
0602: /* Comment copied from interface. */
0603: public Configuration configuration() throws RemoteException {
0604: return engine.configuration();
0605: }
0606:
0607: /* Comment copied from interface. */
0608: public ProcessDefinitionDirectory processDefinitionDirectory()
0609: throws RemoteException {
0610: return engine.processDefinitionDirectory();
0611: }
0612:
0613: /* Comment copied from interface. */
0614: public ProcessDirectory processDirectory() throws RemoteException {
0615: return engine.processDirectory();
0616: }
0617:
0618: /* Comment copied from interface. */
0619: public Collection knownResources() throws RemoteException {
0620: return engine.knownResources();
0621: }
0622:
0623: /* Comment copied from interface. */
0624: public WfResource resourceByKey(String key)
0625: throws InvalidKeyException, RemoteException {
0626: return engine.resourceByKey(key);
0627: }
0628:
0629: /* Comment copied from interface. */
0630: public Collection authorizers(WfResource resource)
0631: throws RemoteException {
0632: return engine.authorizers(resource);
0633: }
0634:
0635: /* Comment copied from interface. */
0636: public WfResource asResource(Principal principal)
0637: throws RemoteException, InvalidKeyException {
0638: return engine.asResource(principal);
0639: }
0640:
0641: /* Comment copied from interface. */
0642: public Collection requestedBy(WfRequester req)
0643: throws RemoteException {
0644: return ((ExtProcessDirectory) engine.processDirectory())
0645: .requestedBy(req);
0646: }
0647:
0648: /* Comment copied from interface. */
0649: public void doFinish(WfActivity act, ProcessData result)
0650: throws InvalidDataException, CannotCompleteException,
0651: RemoteException {
0652: engine.doFinish(act, result);
0653: }
0654:
0655: /* Comment copied from interface. */
0656: public Object executeBatch(Batch batch) throws RemoteException,
0657: InvocationTargetException {
0658: return engine.executeBatch(batch);
0659: }
0660:
0661: /* Comment copied from interface. */
0662: public WfObject eventReceiver(WfAuditHandler handler)
0663: throws RemoteException {
0664: try {
0665: EventSubscriber es = new EventSubscriberImpl(null, null);
0666: es.setEventHandler(handler);
0667: return es;
0668: } catch (IOException e) {
0669: // This mapping is wrong but backward compatible, and the
0670: // method now deprecated anyway.
0671: throw (RemoteException) (new RemoteException(e.getMessage()))
0672: .initCause(e);
0673: }
0674: }
0675:
0676: /* Comment copied from interface. */
0677: public EventSubscriber createEventSubscriber() throws IOException {
0678: return new EventSubscriberImpl(null, null);
0679: }
0680:
0681: /* Comment copied from interface. */
0682: public EventSubscriber createEventSubscriber(String processKey,
0683: String eventTypes) throws IOException {
0684: return new EventSubscriberImpl(processKey, eventTypes);
0685: }
0686:
0687: /* Comment copied from interface. */
0688: public void registerRequester(WfRequester requester)
0689: throws RemoteException {
0690: synchronized (this ) {
0691: if (reqEvtRec == null) {
0692: try {
0693: reqEvtRec = createEventSubscriber();
0694: reqEvtRec.setEventHandler(this );
0695: } catch (IOException e) {
0696: throw (IllegalStateException) (new IllegalStateException(
0697: e.getMessage())).initCause(e);
0698: }
0699: reqsByProcKey = new HashMap();
0700: procKeysByReq = new HashMap();
0701: ignoredProcs = new ArrayList();
0702: reqRefQueue = new ReferenceQueue();
0703: reqRefCleaner = new Thread() {
0704: public void run() {
0705: while (true) {
0706: try {
0707: Reference ref = reqRefQueue.remove();
0708: synchronized (StandardWorkflowService.this ) {
0709: Set procKeys = (Set) procKeysByReq
0710: .remove(ref);
0711: for (Iterator i = procKeys
0712: .iterator(); i.hasNext();) {
0713: reqsByProcKey.remove((String) i
0714: .next());
0715: }
0716: }
0717: } catch (InterruptedException e) {
0718: // deliberatly ignored
0719: }
0720: }
0721: }
0722: };
0723: reqRefCleaner.setDaemon(true);
0724: reqRefCleaner.start();
0725: }
0726: procKeysByReq.put(
0727: new WeakReference(requester, reqRefQueue),
0728: new HashSet());
0729: // we could demand a requester to be registered before first
0730: // use and thus avoid rebuilding ignoredProcs after every
0731: // registration. But then we would need elaborate code to
0732: // clean out old process keys from the ignoredProcs set...
0733: ignoredProcs.clear();
0734: }
0735: }
0736:
0737: /**
0738: * Receive an event and distribute it to the appropriate handler.
0739: *
0740: * @param evt the event.
0741: * @throws InvalidPerformerException thrown by the derived
0742: * {@link de.danet.an.workflow.omgcore.WfRequester
0743: * <code>WfRequester</code>} if it receives an event from a
0744: * process that is not among its performers.
0745: * @throws RemoteException if a system-level error occurs.
0746: */
0747: public void receiveEvent(WfAuditEvent evt)
0748: throws InvalidPerformerException, RemoteException {
0749: try {
0750: String procKey = evt.processKey();
0751: WfRequester requester = null;
0752: synchronized (this ) {
0753: WeakReference requesterRef = (WeakReference) reqsByProcKey
0754: .get(procKey);
0755: if (requesterRef != null) {
0756: requester = (WfRequester) requesterRef.get();
0757: }
0758: if (requester == null) {
0759: // if we know that none of ours handles it, discard
0760: if (ignoredProcs.contains(procKey)) {
0761: return;
0762: }
0763: // try to find handler
0764: found: for (Iterator i = procKeysByReq.keySet()
0765: .iterator(); i.hasNext();) {
0766: WeakReference reqRef = (WeakReference) i.next();
0767: WfRequester req = (WfRequester) reqRef.get();
0768: if (req == null) {
0769: continue;
0770: }
0771: Set assocKeys = (Set) procKeysByReq.get(reqRef);
0772: Collection perfs = ((WfRequester) req)
0773: .performers();
0774: for (Iterator p = perfs.iterator(); p.hasNext();) {
0775: WfProcess perf = (WfProcess) p.next();
0776: String pk = perf.key();
0777: reqsByProcKey.put(pk, reqRef);
0778: assocKeys.add(pk);
0779: if (pk.equals(procKey)) {
0780: requester = req;
0781: break found;
0782: }
0783: }
0784: }
0785: if (requester == null) {
0786: ignoredProcs.add(procKey);
0787: }
0788: }
0789: }
0790: // if we know, who handles this, forward (outside sync!)
0791: if (requester != null) {
0792: requester.receiveEvent(evt);
0793: }
0794: } catch (InvalidPerformerException e) {
0795: // deliberately ignored.
0796: } catch (RemoteException e) {
0797: // deliberately ignored.
0798: }
0799: }
0800:
0801: /**
0802: * Return the channel messages out topic.
0803: * @return the topic
0804: */
0805: Topic channelMessageOutTopic() {
0806: return channelMessageOutTopic;
0807: }
0808:
0809: /* Comment copied from interface. */
0810: public Channel getChannel(WfProcess process, String channelName)
0811: throws RemoteException {
0812: return new ChannelImpl(this , (Process) process, channelName,
0813: false);
0814: }
0815:
0816: /* Comment copied from interface. */
0817: public Channel getChannel(WfProcess process, String channelName,
0818: boolean sendOnly) throws RemoteException {
0819: return new ChannelImpl(this , (Process) process, channelName,
0820: sendOnly);
0821: }
0822:
0823: /**
0824: * Free any allocated resources.
0825: */
0826: public void release() {
0827: if (connectionCleanupThread != null) {
0828: Runtime.getRuntime().removeShutdownHook(
0829: connectionCleanupThread);
0830: connectionCleanupThread.run();
0831: connectionCleanupThread = null;
0832: }
0833: engine = null;
0834: }
0835:
0836: /* Comment copied from interface. */
0837: public void release(WfObject obj) {
0838: if (obj instanceof EventSubscriberImpl) {
0839: ((EventSubscriberImpl) obj).close();
0840: return;
0841: }
0842: if (obj instanceof ChannelImpl) {
0843: ((ChannelImpl) obj).release();
0844: return;
0845: }
0846: if (obj instanceof StandardWorkflowService) {
0847: ((StandardWorkflowService) obj).release();
0848: return;
0849: }
0850: EJBUtil.removeSession(obj);
0851: }
0852:
0853: /* (non-Javadoc)
0854: * @see de.danet.an.workflow.api.WorkflowService#caller()
0855: */
0856: public Principal caller() throws RemoteException {
0857: return engine.caller();
0858: }
0859:
0860: /**
0861: * This class provides a wrapper around a topic connection that
0862: * prevents it from being stopped or closed (redefined to noop).
0863: *
0864: * @author Michael Lipp
0865: *
0866: */
0867: public static class TopicConnectionWrapper implements
0868: TopicConnection {
0869: private TopicConnection delegee;
0870:
0871: /**
0872: * Create a new instance with all attributes initialized
0873: * to defaults or the given values.
0874: *
0875: * @param delegee
0876: */
0877: public TopicConnectionWrapper(TopicConnection delegee) {
0878: this .delegee = delegee;
0879: }
0880:
0881: /**
0882: * @throws JMSException
0883: * @see javax.jms.Connection#close()
0884: */
0885: public void close() throws JMSException {
0886: }
0887:
0888: /**
0889: * @param arg0
0890: * @param arg1
0891: * @param arg2
0892: * @param arg3
0893: * @return
0894: * @throws JMSException
0895: * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination, java.lang.String, javax.jms.ServerSessionPool, int)
0896: */
0897: public ConnectionConsumer createConnectionConsumer(
0898: Destination arg0, String arg1, ServerSessionPool arg2,
0899: int arg3) throws JMSException {
0900: return delegee.createConnectionConsumer(arg0, arg1, arg2,
0901: arg3);
0902: }
0903:
0904: /**
0905: * @param arg0
0906: * @param arg1
0907: * @param arg2
0908: * @param arg3
0909: * @return
0910: * @throws JMSException
0911: * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic, java.lang.String, javax.jms.ServerSessionPool, int)
0912: */
0913: public ConnectionConsumer createConnectionConsumer(Topic arg0,
0914: String arg1, ServerSessionPool arg2, int arg3)
0915: throws JMSException {
0916: return delegee.createConnectionConsumer(arg0, arg1, arg2,
0917: arg3);
0918: }
0919:
0920: /**
0921: * @param arg0
0922: * @param arg1
0923: * @param arg2
0924: * @param arg3
0925: * @param arg4
0926: * @return
0927: * @throws JMSException
0928: * @see javax.jms.TopicConnection#createDurableConnectionConsumer(javax.jms.Topic, java.lang.String, java.lang.String, javax.jms.ServerSessionPool, int)
0929: */
0930: public ConnectionConsumer createDurableConnectionConsumer(
0931: Topic arg0, String arg1, String arg2,
0932: ServerSessionPool arg3, int arg4) throws JMSException {
0933: return delegee.createDurableConnectionConsumer(arg0, arg1,
0934: arg2, arg3, arg4);
0935: }
0936:
0937: /**
0938: * @param arg0
0939: * @param arg1
0940: * @return
0941: * @throws JMSException
0942: * @see javax.jms.Connection#createSession(boolean, int)
0943: */
0944: public Session createSession(boolean arg0, int arg1)
0945: throws JMSException {
0946: return delegee.createSession(arg0, arg1);
0947: }
0948:
0949: /**
0950: * @param arg0
0951: * @param arg1
0952: * @return
0953: * @throws JMSException
0954: * @see javax.jms.TopicConnection#createTopicSession(boolean, int)
0955: */
0956: public TopicSession createTopicSession(boolean arg0, int arg1)
0957: throws JMSException {
0958: return delegee.createTopicSession(arg0, arg1);
0959: }
0960:
0961: /**
0962: * @return
0963: * @throws JMSException
0964: * @see javax.jms.Connection#getClientID()
0965: */
0966: public String getClientID() throws JMSException {
0967: return delegee.getClientID();
0968: }
0969:
0970: /**
0971: * @return
0972: * @throws JMSException
0973: * @see javax.jms.Connection#getExceptionListener()
0974: */
0975: public ExceptionListener getExceptionListener()
0976: throws JMSException {
0977: return delegee.getExceptionListener();
0978: }
0979:
0980: /**
0981: * @return
0982: * @throws JMSException
0983: * @see javax.jms.Connection#getMetaData()
0984: */
0985: public ConnectionMetaData getMetaData() throws JMSException {
0986: return delegee.getMetaData();
0987: }
0988:
0989: /**
0990: * @param arg0
0991: * @throws JMSException
0992: * @see javax.jms.Connection#setClientID(java.lang.String)
0993: */
0994: public void setClientID(String arg0) throws JMSException {
0995: delegee.setClientID(arg0);
0996: }
0997:
0998: /**
0999: * @param arg0
1000: * @throws JMSException
1001: * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
1002: */
1003: public void setExceptionListener(ExceptionListener arg0)
1004: throws JMSException {
1005: delegee.setExceptionListener(arg0);
1006: }
1007:
1008: /**
1009: * @throws JMSException
1010: * @see javax.jms.Connection#start()
1011: */
1012: public void start() throws JMSException {
1013: delegee.start();
1014: }
1015:
1016: /**
1017: * @throws JMSException
1018: * @see javax.jms.Connection#stop()
1019: */
1020: public void stop() throws JMSException {
1021: }
1022: }
1023: }
|