Source Code Cross Referenced for StandardWorkflowService.java in  » Workflow-Engines » wfmopen-2.1.1 » de » danet » an » workflow » ejbs » client » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » wfmopen 2.1.1 » de.danet.an.workflow.ejbs.client 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * This file is part of the WfMOpen project.
0003:         * Copyright (C) 2001-2003 Danet GmbH (www.danet.de), GS-AN.
0004:         * All rights reserved.
0005:         *
0006:         * This program is free software; you can redistribute it and/or modify
0007:         * it under the terms of the GNU General Public License as published by
0008:         * the Free Software Foundation; either version 2 of the License, or
0009:         * (at your option) any later version.
0010:         *
0011:         * This program is distributed in the hope that it will be useful,
0012:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0014:         * GNU General Public License for more details.
0015:         *
0016:         * You should have received a copy of the GNU General Public License
0017:         * along with this program; if not, write to the Free Software
0018:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0019:         *
0020:         * $Id: StandardWorkflowService.java,v 1.13.2.1 2007/11/02 16:00:33 drmlipp Exp $
0021:         *
0022:         * $Log: StandardWorkflowService.java,v $
0023:         * Revision 1.13.2.1  2007/11/02 16:00:33  drmlipp
0024:         * Merged bug fixes from HEAD.
0025:         *
0026:         * Revision 1.14  2007/10/07 18:40:10  mlipp
0027:         * Removed superfluous imports.
0028:         *
0029:         * Revision 1.13  2007/02/27 14:34:20  drmlipp
0030:         * Some refactoring to reduce cyclic dependencies.
0031:         *
0032:         * Revision 1.12  2007/02/21 21:32:29  mlipp
0033:         * Using pooled JMS connections when in EJB now.
0034:         *
0035:         * Revision 1.11  2007/02/17 21:22:56  mlipp
0036:         * Improved service caching and topic connection handling.
0037:         *
0038:         * Revision 1.10  2007/02/16 21:43:23  mlipp
0039:         * Improved.
0040:         *
0041:         * Revision 1.9  2007/02/15 14:28:29  drmlipp
0042:         * Even more improvements.
0043:         *
0044:         * Revision 1.8  2007/02/15 14:09:45  drmlipp
0045:         * Improved resource releasing.
0046:         *
0047:         * Revision 1.7  2007/02/15 13:52:37  drmlipp
0048:         * Fixed channel release problem.
0049:         *
0050:         * Revision 1.6  2006/09/29 12:32:09  drmlipp
0051:         * Consistently using WfMOpen as projct name now.
0052:         *
0053:         * Revision 1.5  2006/09/21 14:20:42  drmlipp
0054:         * New method for retrieving current user.
0055:         *
0056:         * Revision 1.4  2005/08/17 21:22:20  mlipp
0057:         * Removed deprecated method (as announced in 1.3).
0058:         *
0059:         * Revision 1.3  2005/08/17 21:15:31  mlipp
0060:         * Synchronized with 1.3.1p3.
0061:         *
0062:         * Revision 1.1.1.1.6.3  2005/08/17 20:39:12  drmlipp
0063:         * Removed usage of RAS on client side.
0064:         *
0065:         * Revision 1.2  2005/04/08 11:28:05  drmlipp
0066:         * Merged changes from 1.3 branch up to 1.3p6.
0067:         *
0068:         * Revision 1.1.1.1.6.2  2005/04/07 12:13:03  drmlipp
0069:         * Added event subscriber with filter.
0070:         *
0071:         * Revision 1.1.1.1.6.1  2005/04/06 15:42:06  drmlipp
0072:         * Added additional support for accessing the event queue to
0073:         * WorkflowService.
0074:         *
0075:         * Revision 1.1.1.1  2004/08/18 15:17:38  drmlipp
0076:         * Update to 1.2
0077:         *
0078:         * Revision 1.5  2004/07/11 19:56:51  lipp
0079:         * Adapted to JOnAS 4.1.2 naming scheme.
0080:         *
0081:         * Revision 1.4  2004/07/08 14:42:30  lipp
0082:         * Providing support for separated connection factories.
0083:         *
0084:         * Revision 1.3  2004/07/06 11:45:20  lipp
0085:         * Asure usability in various application servers.
0086:         *
0087:         * Revision 1.2  2004/06/14 19:37:20  lipp
0088:         * Fixed assignment functions and cleaned up assignment related
0089:         * interfaces.
0090:         *
0091:         * Revision 1.1  2004/02/21 21:31:00  lipp
0092:         * Some more refactoring to resolve cyclic dependencies.
0093:         *
0094:         * Revision 1.28  2004/02/12 13:10:38  lipp
0095:         * Renamed openChannel to getChannel (channels have no open state).
0096:         *
0097:         * Revision 1.27  2004/01/30 14:36:30  lipp
0098:         * Partial implementation of message receipt.
0099:         *
0100:         * Revision 1.26  2004/01/28 16:11:38  lipp
0101:         * Re-implementation of chabacc, Sender working.
0102:         *
0103:         * Revision 1.25  2004/01/23 12:49:26  lipp
0104:         * Fixes to WorkflowService[Factory] implementation/documentation.
0105:         *
0106:         * Revision 1.24  2004/01/22 16:10:50  lipp
0107:         * Channel message queue not available yet.
0108:         *
0109:         * Revision 1.23  2004/01/22 15:06:09  lipp
0110:         * Clarified serializability of workflow service.
0111:         *
0112:         * Revision 1.22  2003/11/21 14:53:50  lipp
0113:         * Create topic connection on client side, support initial context
0114:         * override.
0115:         *
0116:         * Revision 1.21  2003/10/06 15:20:27  lipp
0117:         * Made doFinish available in WorkflowService.
0118:         *
0119:         * Revision 1.20  2003/06/27 08:51:45  lipp
0120:         * Fixed copyright/license information.
0121:         *
0122:         * Revision 1.19  2003/06/01 20:58:50  lipp
0123:         * Moved toSAX to batch.
0124:         *
0125:         * Revision 1.18  2003/05/02 15:28:29  lipp
0126:         * Resolved some more package dependencies.
0127:         *
0128:         * Revision 1.17  2003/05/02 14:55:58  lipp
0129:         * Resolved some more package dependencies.
0130:         *
0131:         * Revision 1.16  2003/04/26 16:11:14  lipp
0132:         * Moved some classes to reduce package dependencies.
0133:         *
0134:         * Revision 1.15  2003/04/22 16:35:25  lipp
0135:         * Made SAXEventBuffer outer class.
0136:         *
0137:         * Revision 1.14  2003/03/31 16:50:28  huaiyang
0138:         * Logging using common-logging.
0139:         *
0140:         * Revision 1.13  2003/02/25 17:08:05  lipp
0141:         * Reorganized requester implementation.
0142:         *
0143:         * Revision 1.12  2003/02/11 08:50:41  lipp
0144:         * Updated comment.
0145:         *
0146:         * Revision 1.11  2003/02/07 15:56:19  lipp
0147:         * Implemented Requester notifications.
0148:         *
0149:         * Revision 1.10  2003/02/06 12:47:14  lipp
0150:         * Implemented Requester (no event handling yet).
0151:         *
0152:         * Revision 1.9  2003/02/05 15:57:06  lipp
0153:         * Replaced DummyRequester with DefaultRequester.
0154:         *
0155:         * Revision 1.8  2002/12/19 21:37:42  lipp
0156:         * Reorganized interfaces.
0157:         *
0158:         * Revision 1.7  2002/12/19 16:23:46  lipp
0159:         * Resolved illegal dependency between apis and danet.an.util.
0160:         *
0161:         * Revision 1.6  2002/12/10 11:21:05  lipp
0162:         * Added batch processing as "generic DTO".
0163:         *
0164:         * Revision 1.5  2002/11/22 09:56:15  lipp
0165:         * Clarified usage of the danet utility bean for user preferences.
0166:         *
0167:         * Revision 1.4  2002/09/19 14:37:37  lipp
0168:         * Using WorkflowService.release now and optimized process definition
0169:         * storage.
0170:         *
0171:         * Revision 1.3  2002/09/18 21:26:51  lipp
0172:         * Removed SAXFacade (integrated with WorkflowEngine).
0173:         *
0174:         * Revision 1.2  2002/09/18 20:48:21  lipp
0175:         * Cleanly separated workflow engine and service.
0176:         *
0177:         * Revision 1.1  2002/09/18 14:29:44  lipp
0178:         * Partial workflow service implementation added.
0179:         *
0180:         */
0181:        package de.danet.an.workflow.ejbs.client;
0182:
0183:        import java.io.IOException;
0184:
0185:        import java.util.ArrayList;
0186:        import java.util.Collection;
0187:        import java.util.HashMap;
0188:        import java.util.HashSet;
0189:        import java.util.Iterator;
0190:        import java.util.Map;
0191:        import java.util.Set;
0192:        import java.util.StringTokenizer;
0193:
0194:        import java.lang.ref.Reference;
0195:        import java.lang.ref.ReferenceQueue;
0196:        import java.lang.ref.WeakReference;
0197:        import java.lang.reflect.InvocationTargetException;
0198:        import java.rmi.RemoteException;
0199:        import java.security.Principal;
0200:
0201:        import javax.jms.ConnectionConsumer;
0202:        import javax.jms.ConnectionFactory;
0203:        import javax.jms.ConnectionMetaData;
0204:        import javax.jms.Destination;
0205:        import javax.jms.ExceptionListener;
0206:        import javax.jms.JMSException;
0207:        import javax.jms.Message;
0208:        import javax.jms.MessageListener;
0209:        import javax.jms.ObjectMessage;
0210:        import javax.jms.ServerSessionPool;
0211:        import javax.jms.Session;
0212:        import javax.jms.Topic;
0213:        import javax.jms.TopicConnection;
0214:        import javax.jms.TopicConnectionFactory;
0215:        import javax.jms.TopicSession;
0216:        import javax.jms.TopicSubscriber;
0217:        import javax.naming.Context;
0218:        import javax.naming.NamingException;
0219:
0220:        import de.danet.an.util.EJBUtil;
0221:
0222:        import de.danet.an.workflow.omgcore.CannotCompleteException;
0223:        import de.danet.an.workflow.omgcore.InvalidDataException;
0224:        import de.danet.an.workflow.omgcore.InvalidPerformerException;
0225:        import de.danet.an.workflow.omgcore.ProcessData;
0226:        import de.danet.an.workflow.omgcore.WfActivity;
0227:        import de.danet.an.workflow.omgcore.WfAuditEvent;
0228:        import de.danet.an.workflow.omgcore.WfAuditHandler;
0229:        import de.danet.an.workflow.omgcore.WfObject;
0230:        import de.danet.an.workflow.omgcore.WfProcess;
0231:        import de.danet.an.workflow.omgcore.WfRequester;
0232:        import de.danet.an.workflow.omgcore.WfResource;
0233:
0234:        import de.danet.an.workflow.api.Batch;
0235:        import de.danet.an.workflow.api.Channel;
0236:        import de.danet.an.workflow.api.Configuration;
0237:        import de.danet.an.workflow.api.EventSubscriber;
0238:        import de.danet.an.workflow.api.InvalidKeyException;
0239:        import de.danet.an.workflow.api.Process;
0240:        import de.danet.an.workflow.api.ProcessDefinitionDirectory;
0241:        import de.danet.an.workflow.api.ProcessDirectory;
0242:        import de.danet.an.workflow.api.WorkflowService;
0243:
0244:        import de.danet.an.workflow.apix.ExtProcessDirectory;
0245:        import de.danet.an.workflow.ejbs.WorkflowEngine;
0246:
0247:        /**
0248:         * This class provides the implementation of 
0249:         * {@link WorkflowService <code>WorkflowService</code>}.<P>
0250:         *
0251:         * If loglevel is <code>DEBUG</code>, every event subscriber created
0252:         * with {@link #createEventSubscriber
0253:         * <code>createEventReceiver</code>} logs the received events.
0254:         *
0255:         * @author <a href="mailto:lipp@danet.de"></a>
0256:         * @version $Revision: 1.13.2.1 $
0257:         */
0258:
0259:        public class StandardWorkflowService implements  WorkflowService,
0260:                WfAuditHandler {
0261:
0262:            private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
0263:                    .getLog(StandardWorkflowService.class);
0264:
0265:            private Map serviceProperties = null;
0266:
0267:            private Context initialContext = null;
0268:
0269:            private Boolean jmsConnectionReusable = null;
0270:            private TopicConnectionFactory topConFacCache = null;
0271:            private TopicConnection topConUnwrappedCache = null;
0272:            private TopicConnection topConCache = null;
0273:            private Thread connectionCleanupThread = null;
0274:            private Topic eventService = null;
0275:            private Topic channelMessageOutTopic = null;
0276:
0277:            private EventSubscriber reqEvtRec = null;
0278:            private Map reqsByProcKey = null;
0279:            private Map procKeysByReq = null;
0280:            private Collection ignoredProcs = null;
0281:
0282:            private ReferenceQueue reqRefQueue = null;
0283:            private Thread reqRefCleaner = null;
0284:
0285:            private WorkflowEngine engine = null;
0286:
0287:            /**
0288:             * Creates an instance of <code>StandardWorkflowService</code>
0289:             * with all attributes initialized to default values.
0290:             *
0291:             * @param props the service properties
0292:             * @param ic the initial context to be used for lookup of server
0293:             * resources
0294:             * @param e the workflow engine
0295:             */
0296:            public StandardWorkflowService(Map props, Context ic,
0297:                    WorkflowEngine e) {
0298:                serviceProperties = props;
0299:                initialContext = ic;
0300:                engine = e;
0301:            }
0302:
0303:            private void initJmsSetup() throws RemoteException, JMSException {
0304:                if (topConFacCache != null) {
0305:                    return;
0306:                }
0307:                Object[] esd = engine.eventServiceData();
0308:                try {
0309:                    // This is a hack, but if anybody knowns a better way to
0310:                    // adapt to the application server used, please tell
0311:                    // me. This will eventually go away when JBoss gets a
0312:                    // client container, as we have application server
0313:                    // specific binding on the client side then.
0314:                    String conFacName = (String) esd[0];
0315:                    String topicConFacName = (String) esd[2];
0316:
0317:                    ConnectionFactory jbossInternalConFac = null;
0318:                    try {
0319:                        jbossInternalConFac = (ConnectionFactory) initialContext
0320:                                .lookup("java:/JmsXA");
0321:                    } catch (NamingException e) {
0322:                        // deliberately ignored
0323:                    }
0324:                    if (jbossInternalConFac == null) {
0325:                        try {
0326:                            jbossInternalConFac = (ConnectionFactory) initialContext
0327:                                    .lookup("java:XAConnectionFactory");
0328:                        } catch (NamingException e) {
0329:                            // deliberately ignored
0330:                        }
0331:                    }
0332:                    // Comparison to "null" is workaround for JOnAS bug #300555
0333:                    if (jbossInternalConFac != null) {
0334:                        topConFacCache = (TopicConnectionFactory) jbossInternalConFac;
0335:                    } else if (conFacName != null && !conFacName.equals("null")) {
0336:                        // Use this if your AS provides a factory that
0337:                        // implements both Queue- and TopicConnectionFactory
0338:                        topConFacCache = (TopicConnectionFactory) initialContext
0339:                                .lookup(conFacName);
0340:                    } else {
0341:                        if (topicConFacName == null
0342:                                || topicConFacName.equals("null")) {
0343:                            if (engine
0344:                                    .getClass()
0345:                                    .getName()
0346:                                    .startsWith(
0347:                                            "de.danet.an.workflow.ejbs.JOnASWorkflowEngine")) {
0348:                                topicConFacName = "JTCF";
0349:                            } else {
0350:                                // popular default
0351:                                topicConFacName = "ConnectionFactory";
0352:                            }
0353:                        }
0354:                        topConFacCache = (TopicConnectionFactory) initialContext
0355:                                .lookup(topicConFacName);
0356:                    }
0357:                    eventService = (Topic) initialContext
0358:                            .lookup((String) esd[3]);
0359:                    channelMessageOutTopic = (Topic) initialContext
0360:                            .lookup((String) esd[4]);
0361:                } catch (NamingException e) {
0362:                    logger.error(e.getMessage(), e);
0363:                    throw new IllegalStateException(e.getMessage());
0364:                }
0365:            }
0366:
0367:            /**
0368:             * Return a started topic connection.
0369:             * @return the connection
0370:             * @throws RemoteException if a system-level error occurs
0371:             * @throws JMSException if the connection cannot be created
0372:             */
0373:            TopicConnection topicConnection() throws RemoteException,
0374:                    JMSException {
0375:                synchronized (this ) {
0376:                    if (topConCache != null) {
0377:                        return topConCache;
0378:                    }
0379:                    if (jmsConnectionReusable == null) {
0380:                        initJmsSetup();
0381:                        // Crude, but currently the only known way to find out 
0382:                        // whether we're running in server or client container.
0383:                        // And we need to know because of J2EE 1.4 spec, J2EE.6.6.
0384:                        TopicConnection topCon = topConFacCache
0385:                                .createTopicConnection();
0386:                        topCon.start();
0387:                        jmsConnectionReusable = Boolean.FALSE;
0388:                        TopicSession ts1 = null;
0389:                        TopicSession ts2 = null;
0390:                        try {
0391:                            ts1 = topCon.createTopicSession(false,
0392:                                    Session.AUTO_ACKNOWLEDGE);
0393:                            ts2 = topCon.createTopicSession(false,
0394:                                    Session.AUTO_ACKNOWLEDGE);
0395:                            jmsConnectionReusable = Boolean.TRUE;
0396:                        } catch (JMSException e) {
0397:                            // the created topic connection may still be handed out
0398:                            return topCon;
0399:                        } finally {
0400:                            if (ts1 != null) {
0401:                                ts1.close();
0402:                            }
0403:                            if (ts2 != null) {
0404:                                ts2.close();
0405:                            }
0406:                        }
0407:                        topConUnwrappedCache = topCon;
0408:                        topConCache = new TopicConnectionWrapper(topCon);
0409:                        connectionCleanupThread = new Thread() {
0410:                            public void run() {
0411:                                if (topConUnwrappedCache != null) {
0412:                                    try {
0413:                                        topConUnwrappedCache.close();
0414:                                    } catch (JMSException e) {
0415:                                        logger.warn("Cannot close - ignored: "
0416:                                                + e.getMessage(), e);
0417:                                    }
0418:                                    topConCache = null;
0419:                                    topConUnwrappedCache = null;
0420:                                }
0421:                            }
0422:                        };
0423:                        Runtime.getRuntime().addShutdownHook(
0424:                                connectionCleanupThread);
0425:                        return topConCache;
0426:                    }
0427:                }
0428:                TopicConnection topCon = ((TopicConnectionFactory) topConFacCache)
0429:                        .createTopicConnection();
0430:                topCon.start();
0431:                return topCon;
0432:            }
0433:
0434:            private class EventSubscriberImpl implements  EventSubscriber {
0435:
0436:                private TopicConnection connection;
0437:                private TopicSession session;
0438:                private TopicSubscriber subs;
0439:
0440:                public EventSubscriberImpl(String processKey, String eventTypes)
0441:                        throws IOException {
0442:                    try {
0443:                        initJmsSetup();
0444:                        connection = topicConnection();
0445:                        session = connection.createTopicSession(false,
0446:                                Session.AUTO_ACKNOWLEDGE);
0447:                        if (processKey == null && eventTypes == null) {
0448:                            subs = session.createSubscriber(eventService);
0449:                            return;
0450:                        }
0451:                        StringBuffer filter = new StringBuffer();
0452:                        if (processKey != null) {
0453:                            filter.append("processKey = '" + processKey + "'");
0454:                        }
0455:                        if (eventTypes != null) {
0456:                            if (filter.length() > 0) {
0457:                                filter.append(" AND ");
0458:                            }
0459:                            filter.append("(");
0460:                            StringTokenizer st = new StringTokenizer(
0461:                                    eventTypes, " \t\n\r\f,;");
0462:                            boolean first = true;
0463:                            while (st.hasMoreTokens()) {
0464:                                if (first) {
0465:                                    first = false;
0466:                                } else {
0467:                                    filter.append(" OR ");
0468:                                }
0469:                                filter.append("(eventType = '");
0470:                                filter.append(st.nextToken());
0471:                                filter.append("')");
0472:                            }
0473:                            if (first) {
0474:                                throw new IllegalArgumentException("\""
0475:                                        + eventTypes + "\" is not a"
0476:                                        + " valid list of event types.");
0477:                            }
0478:                            filter.append(")");
0479:                        }
0480:                        subs = session.createSubscriber(eventService, filter
0481:                                .toString(), true);
0482:                        if (logger.isDebugEnabled()) {
0483:                            logger.debug("Created " + this  + " with filter \""
0484:                                    + filter.toString() + "\"");
0485:                        }
0486:                    } catch (JMSException e) {
0487:                        throw (IOException) (new IOException(e.getMessage()))
0488:                                .initCause(e);
0489:                    }
0490:                }
0491:
0492:                public void close() {
0493:                    try {
0494:                        subs.close();
0495:                        session.close();
0496:                        connection.close();
0497:                    } catch (JMSException e) {
0498:                        logger.error("Closing topic: " + e.getMessage(), e);
0499:                    }
0500:                    subs = null;
0501:                    session = null;
0502:                    connection = null;
0503:                }
0504:
0505:                public WfAuditEvent receive() throws IOException {
0506:                    try {
0507:                        Object e = ((ObjectMessage) subs.receive()).getObject();
0508:                        if (logger.isDebugEnabled()) {
0509:                            logger.debug("EventSubscriber " + this 
0510:                                    + " received " + e);
0511:                        }
0512:                        return (WfAuditEvent) e;
0513:                    } catch (JMSException e) {
0514:                        throw (IOException) (new IOException(e.getMessage()))
0515:                                .initCause(e);
0516:                    }
0517:                }
0518:
0519:                public WfAuditEvent receive(long timeout) throws IOException {
0520:                    try {
0521:                        Object e = subs.receive(timeout);
0522:                        if (e == null) {
0523:                            return null;
0524:                        }
0525:                        e = ((ObjectMessage) e).getObject();
0526:                        if (logger.isDebugEnabled()) {
0527:                            logger.debug("EventSubscriber " + this 
0528:                                    + " received " + e);
0529:                        }
0530:                        return (WfAuditEvent) e;
0531:                    } catch (JMSException e) {
0532:                        throw (IOException) (new IOException(e.getMessage()))
0533:                                .initCause(e);
0534:                    }
0535:                }
0536:
0537:                public WfAuditEvent receiveNoWait() throws IOException {
0538:                    try {
0539:                        Object e = ((ObjectMessage) subs.receiveNoWait())
0540:                                .getObject();
0541:                        if (e == null) {
0542:                            return null;
0543:                        }
0544:                        if (logger.isDebugEnabled()) {
0545:                            logger.debug("EventSubscriber " + this 
0546:                                    + " received " + e);
0547:                        }
0548:                        return (WfAuditEvent) e;
0549:                    } catch (JMSException e) {
0550:                        throw (IOException) (new IOException(e.getMessage()))
0551:                                .initCause(e);
0552:                    }
0553:                }
0554:
0555:                public void setEventHandler(WfAuditHandler hndlr)
0556:                        throws IOException {
0557:                    final WfAuditHandler handler = hndlr;
0558:                    try {
0559:                        subs.setMessageListener(new MessageListener() {
0560:                            public void onMessage(Message msg) {
0561:                                Object e = null;
0562:                                try {
0563:                                    e = ((ObjectMessage) msg).getObject();
0564:                                    if (logger.isDebugEnabled()) {
0565:                                        logger
0566:                                                .debug("EventSubscriber "
0567:                                                        + this 
0568:                                                        + " received "
0569:                                                        + e
0570:                                                        + " with processKey="
0571:                                                        + msg
0572:                                                                .getStringProperty("processKey")
0573:                                                        + " and eventType="
0574:                                                        + msg
0575:                                                                .getStringProperty("eventType"));
0576:                                    }
0577:                                    handler.receiveEvent((WfAuditEvent) e);
0578:                                } catch (JMSException ex) {
0579:                                    logger.error(ex.getMessage(), ex);
0580:                                } catch (InvalidPerformerException ex) {
0581:                                    // deliberatly ignored.
0582:                                } catch (RemoteException ex) {
0583:                                    // deliberatly ignored.
0584:                                } catch (Exception ex) {
0585:                                    logger.error(this  + "cannot handle " + e
0586:                                            + ": " + ex.getMessage(), ex);
0587:                                }
0588:                            }
0589:                        });
0590:                    } catch (JMSException e) {
0591:                        throw (IOException) (new IOException(e.getMessage()))
0592:                                .initCause(e);
0593:                    }
0594:                }
0595:            }
0596:
0597:            /* Comment copied from interface. */
0598:            public Map serviceProperties() throws RemoteException {
0599:                return serviceProperties;
0600:            }
0601:
0602:            /* Comment copied from interface. */
0603:            public Configuration configuration() throws RemoteException {
0604:                return engine.configuration();
0605:            }
0606:
0607:            /* Comment copied from interface. */
0608:            public ProcessDefinitionDirectory processDefinitionDirectory()
0609:                    throws RemoteException {
0610:                return engine.processDefinitionDirectory();
0611:            }
0612:
0613:            /* Comment copied from interface. */
0614:            public ProcessDirectory processDirectory() throws RemoteException {
0615:                return engine.processDirectory();
0616:            }
0617:
0618:            /* Comment copied from interface. */
0619:            public Collection knownResources() throws RemoteException {
0620:                return engine.knownResources();
0621:            }
0622:
0623:            /* Comment copied from interface. */
0624:            public WfResource resourceByKey(String key)
0625:                    throws InvalidKeyException, RemoteException {
0626:                return engine.resourceByKey(key);
0627:            }
0628:
0629:            /* Comment copied from interface. */
0630:            public Collection authorizers(WfResource resource)
0631:                    throws RemoteException {
0632:                return engine.authorizers(resource);
0633:            }
0634:
0635:            /* Comment copied from interface. */
0636:            public WfResource asResource(Principal principal)
0637:                    throws RemoteException, InvalidKeyException {
0638:                return engine.asResource(principal);
0639:            }
0640:
0641:            /* Comment copied from interface. */
0642:            public Collection requestedBy(WfRequester req)
0643:                    throws RemoteException {
0644:                return ((ExtProcessDirectory) engine.processDirectory())
0645:                        .requestedBy(req);
0646:            }
0647:
0648:            /* Comment copied from interface. */
0649:            public void doFinish(WfActivity act, ProcessData result)
0650:                    throws InvalidDataException, CannotCompleteException,
0651:                    RemoteException {
0652:                engine.doFinish(act, result);
0653:            }
0654:
0655:            /* Comment copied from interface. */
0656:            public Object executeBatch(Batch batch) throws RemoteException,
0657:                    InvocationTargetException {
0658:                return engine.executeBatch(batch);
0659:            }
0660:
0661:            /* Comment copied from interface. */
0662:            public WfObject eventReceiver(WfAuditHandler handler)
0663:                    throws RemoteException {
0664:                try {
0665:                    EventSubscriber es = new EventSubscriberImpl(null, null);
0666:                    es.setEventHandler(handler);
0667:                    return es;
0668:                } catch (IOException e) {
0669:                    // This mapping is wrong but backward compatible, and the
0670:                    // method now deprecated anyway.
0671:                    throw (RemoteException) (new RemoteException(e.getMessage()))
0672:                            .initCause(e);
0673:                }
0674:            }
0675:
0676:            /* Comment copied from interface. */
0677:            public EventSubscriber createEventSubscriber() throws IOException {
0678:                return new EventSubscriberImpl(null, null);
0679:            }
0680:
0681:            /* Comment copied from interface. */
0682:            public EventSubscriber createEventSubscriber(String processKey,
0683:                    String eventTypes) throws IOException {
0684:                return new EventSubscriberImpl(processKey, eventTypes);
0685:            }
0686:
0687:            /* Comment copied from interface. */
0688:            public void registerRequester(WfRequester requester)
0689:                    throws RemoteException {
0690:                synchronized (this ) {
0691:                    if (reqEvtRec == null) {
0692:                        try {
0693:                            reqEvtRec = createEventSubscriber();
0694:                            reqEvtRec.setEventHandler(this );
0695:                        } catch (IOException e) {
0696:                            throw (IllegalStateException) (new IllegalStateException(
0697:                                    e.getMessage())).initCause(e);
0698:                        }
0699:                        reqsByProcKey = new HashMap();
0700:                        procKeysByReq = new HashMap();
0701:                        ignoredProcs = new ArrayList();
0702:                        reqRefQueue = new ReferenceQueue();
0703:                        reqRefCleaner = new Thread() {
0704:                            public void run() {
0705:                                while (true) {
0706:                                    try {
0707:                                        Reference ref = reqRefQueue.remove();
0708:                                        synchronized (StandardWorkflowService.this ) {
0709:                                            Set procKeys = (Set) procKeysByReq
0710:                                                    .remove(ref);
0711:                                            for (Iterator i = procKeys
0712:                                                    .iterator(); i.hasNext();) {
0713:                                                reqsByProcKey.remove((String) i
0714:                                                        .next());
0715:                                            }
0716:                                        }
0717:                                    } catch (InterruptedException e) {
0718:                                        // deliberatly ignored
0719:                                    }
0720:                                }
0721:                            }
0722:                        };
0723:                        reqRefCleaner.setDaemon(true);
0724:                        reqRefCleaner.start();
0725:                    }
0726:                    procKeysByReq.put(
0727:                            new WeakReference(requester, reqRefQueue),
0728:                            new HashSet());
0729:                    // we could demand a requester to be registered before first
0730:                    // use and thus avoid rebuilding ignoredProcs after every
0731:                    // registration. But then we would need elaborate code to
0732:                    // clean out old process keys from the ignoredProcs set...
0733:                    ignoredProcs.clear();
0734:                }
0735:            }
0736:
0737:            /**
0738:             * Receive an event and distribute it to the appropriate handler.
0739:             *
0740:             * @param evt the event.
0741:             * @throws InvalidPerformerException thrown by the derived
0742:             * {@link de.danet.an.workflow.omgcore.WfRequester 
0743:             * <code>WfRequester</code>} if it receives an event from a 
0744:             * process that is not among its performers.
0745:             * @throws RemoteException if a system-level error occurs.
0746:             */
0747:            public void receiveEvent(WfAuditEvent evt)
0748:                    throws InvalidPerformerException, RemoteException {
0749:                try {
0750:                    String procKey = evt.processKey();
0751:                    WfRequester requester = null;
0752:                    synchronized (this ) {
0753:                        WeakReference requesterRef = (WeakReference) reqsByProcKey
0754:                                .get(procKey);
0755:                        if (requesterRef != null) {
0756:                            requester = (WfRequester) requesterRef.get();
0757:                        }
0758:                        if (requester == null) {
0759:                            // if we know that none of ours handles it, discard
0760:                            if (ignoredProcs.contains(procKey)) {
0761:                                return;
0762:                            }
0763:                            // try to find handler
0764:                            found: for (Iterator i = procKeysByReq.keySet()
0765:                                    .iterator(); i.hasNext();) {
0766:                                WeakReference reqRef = (WeakReference) i.next();
0767:                                WfRequester req = (WfRequester) reqRef.get();
0768:                                if (req == null) {
0769:                                    continue;
0770:                                }
0771:                                Set assocKeys = (Set) procKeysByReq.get(reqRef);
0772:                                Collection perfs = ((WfRequester) req)
0773:                                        .performers();
0774:                                for (Iterator p = perfs.iterator(); p.hasNext();) {
0775:                                    WfProcess perf = (WfProcess) p.next();
0776:                                    String pk = perf.key();
0777:                                    reqsByProcKey.put(pk, reqRef);
0778:                                    assocKeys.add(pk);
0779:                                    if (pk.equals(procKey)) {
0780:                                        requester = req;
0781:                                        break found;
0782:                                    }
0783:                                }
0784:                            }
0785:                            if (requester == null) {
0786:                                ignoredProcs.add(procKey);
0787:                            }
0788:                        }
0789:                    }
0790:                    // if we know, who handles this, forward (outside sync!)
0791:                    if (requester != null) {
0792:                        requester.receiveEvent(evt);
0793:                    }
0794:                } catch (InvalidPerformerException e) {
0795:                    // deliberately ignored.
0796:                } catch (RemoteException e) {
0797:                    // deliberately ignored.
0798:                }
0799:            }
0800:
0801:            /**
0802:             * Return the channel messages out topic.
0803:             * @return the topic
0804:             */
0805:            Topic channelMessageOutTopic() {
0806:                return channelMessageOutTopic;
0807:            }
0808:
0809:            /* Comment copied from interface. */
0810:            public Channel getChannel(WfProcess process, String channelName)
0811:                    throws RemoteException {
0812:                return new ChannelImpl(this , (Process) process, channelName,
0813:                        false);
0814:            }
0815:
0816:            /* Comment copied from interface. */
0817:            public Channel getChannel(WfProcess process, String channelName,
0818:                    boolean sendOnly) throws RemoteException {
0819:                return new ChannelImpl(this , (Process) process, channelName,
0820:                        sendOnly);
0821:            }
0822:
0823:            /**
0824:             * Free any allocated resources.
0825:             */
0826:            public void release() {
0827:                if (connectionCleanupThread != null) {
0828:                    Runtime.getRuntime().removeShutdownHook(
0829:                            connectionCleanupThread);
0830:                    connectionCleanupThread.run();
0831:                    connectionCleanupThread = null;
0832:                }
0833:                engine = null;
0834:            }
0835:
0836:            /* Comment copied from interface. */
0837:            public void release(WfObject obj) {
0838:                if (obj instanceof  EventSubscriberImpl) {
0839:                    ((EventSubscriberImpl) obj).close();
0840:                    return;
0841:                }
0842:                if (obj instanceof  ChannelImpl) {
0843:                    ((ChannelImpl) obj).release();
0844:                    return;
0845:                }
0846:                if (obj instanceof  StandardWorkflowService) {
0847:                    ((StandardWorkflowService) obj).release();
0848:                    return;
0849:                }
0850:                EJBUtil.removeSession(obj);
0851:            }
0852:
0853:            /* (non-Javadoc)
0854:             * @see de.danet.an.workflow.api.WorkflowService#caller()
0855:             */
0856:            public Principal caller() throws RemoteException {
0857:                return engine.caller();
0858:            }
0859:
0860:            /**
0861:             * This class provides a wrapper around a topic connection that
0862:             * prevents it from being stopped or closed (redefined to noop).
0863:             *
0864:             * @author Michael Lipp
0865:             *
0866:             */
0867:            public static class TopicConnectionWrapper implements 
0868:                    TopicConnection {
0869:                private TopicConnection delegee;
0870:
0871:                /**
0872:                 * Create a new instance with all attributes initialized
0873:                 * to defaults or the given values.
0874:                 *
0875:                 * @param delegee
0876:                 */
0877:                public TopicConnectionWrapper(TopicConnection delegee) {
0878:                    this .delegee = delegee;
0879:                }
0880:
0881:                /**
0882:                 * @throws JMSException
0883:                 * @see javax.jms.Connection#close()
0884:                 */
0885:                public void close() throws JMSException {
0886:                }
0887:
0888:                /**
0889:                 * @param arg0
0890:                 * @param arg1
0891:                 * @param arg2
0892:                 * @param arg3
0893:                 * @return
0894:                 * @throws JMSException
0895:                 * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination, java.lang.String, javax.jms.ServerSessionPool, int)
0896:                 */
0897:                public ConnectionConsumer createConnectionConsumer(
0898:                        Destination arg0, String arg1, ServerSessionPool arg2,
0899:                        int arg3) throws JMSException {
0900:                    return delegee.createConnectionConsumer(arg0, arg1, arg2,
0901:                            arg3);
0902:                }
0903:
0904:                /**
0905:                 * @param arg0
0906:                 * @param arg1
0907:                 * @param arg2
0908:                 * @param arg3
0909:                 * @return
0910:                 * @throws JMSException
0911:                 * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic, java.lang.String, javax.jms.ServerSessionPool, int)
0912:                 */
0913:                public ConnectionConsumer createConnectionConsumer(Topic arg0,
0914:                        String arg1, ServerSessionPool arg2, int arg3)
0915:                        throws JMSException {
0916:                    return delegee.createConnectionConsumer(arg0, arg1, arg2,
0917:                            arg3);
0918:                }
0919:
0920:                /**
0921:                 * @param arg0
0922:                 * @param arg1
0923:                 * @param arg2
0924:                 * @param arg3
0925:                 * @param arg4
0926:                 * @return
0927:                 * @throws JMSException
0928:                 * @see javax.jms.TopicConnection#createDurableConnectionConsumer(javax.jms.Topic, java.lang.String, java.lang.String, javax.jms.ServerSessionPool, int)
0929:                 */
0930:                public ConnectionConsumer createDurableConnectionConsumer(
0931:                        Topic arg0, String arg1, String arg2,
0932:                        ServerSessionPool arg3, int arg4) throws JMSException {
0933:                    return delegee.createDurableConnectionConsumer(arg0, arg1,
0934:                            arg2, arg3, arg4);
0935:                }
0936:
0937:                /**
0938:                 * @param arg0
0939:                 * @param arg1
0940:                 * @return
0941:                 * @throws JMSException
0942:                 * @see javax.jms.Connection#createSession(boolean, int)
0943:                 */
0944:                public Session createSession(boolean arg0, int arg1)
0945:                        throws JMSException {
0946:                    return delegee.createSession(arg0, arg1);
0947:                }
0948:
0949:                /**
0950:                 * @param arg0
0951:                 * @param arg1
0952:                 * @return
0953:                 * @throws JMSException
0954:                 * @see javax.jms.TopicConnection#createTopicSession(boolean, int)
0955:                 */
0956:                public TopicSession createTopicSession(boolean arg0, int arg1)
0957:                        throws JMSException {
0958:                    return delegee.createTopicSession(arg0, arg1);
0959:                }
0960:
0961:                /**
0962:                 * @return
0963:                 * @throws JMSException
0964:                 * @see javax.jms.Connection#getClientID()
0965:                 */
0966:                public String getClientID() throws JMSException {
0967:                    return delegee.getClientID();
0968:                }
0969:
0970:                /**
0971:                 * @return
0972:                 * @throws JMSException
0973:                 * @see javax.jms.Connection#getExceptionListener()
0974:                 */
0975:                public ExceptionListener getExceptionListener()
0976:                        throws JMSException {
0977:                    return delegee.getExceptionListener();
0978:                }
0979:
0980:                /**
0981:                 * @return
0982:                 * @throws JMSException
0983:                 * @see javax.jms.Connection#getMetaData()
0984:                 */
0985:                public ConnectionMetaData getMetaData() throws JMSException {
0986:                    return delegee.getMetaData();
0987:                }
0988:
0989:                /**
0990:                 * @param arg0
0991:                 * @throws JMSException
0992:                 * @see javax.jms.Connection#setClientID(java.lang.String)
0993:                 */
0994:                public void setClientID(String arg0) throws JMSException {
0995:                    delegee.setClientID(arg0);
0996:                }
0997:
0998:                /**
0999:                 * @param arg0
1000:                 * @throws JMSException
1001:                 * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
1002:                 */
1003:                public void setExceptionListener(ExceptionListener arg0)
1004:                        throws JMSException {
1005:                    delegee.setExceptionListener(arg0);
1006:                }
1007:
1008:                /**
1009:                 * @throws JMSException
1010:                 * @see javax.jms.Connection#start()
1011:                 */
1012:                public void start() throws JMSException {
1013:                    delegee.start();
1014:                }
1015:
1016:                /**
1017:                 * @throws JMSException
1018:                 * @see javax.jms.Connection#stop()
1019:                 */
1020:                public void stop() throws JMSException {
1021:                }
1022:            }
1023:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.