001: /*
002: * ====================================================================
003: *
004: * XFLOW - Process Management System
005: * Copyright (C) 2003 Rob Tan
006: * All rights reserved.
007: *
008: * Redistribution and use in source and binary forms, with or without
009: * modification, are permitted provided that the following conditions
010: * are met:
011: *
012: * 1. Redistributions of source code must retain the above copyright
013: * notice, this list of conditions, and the following disclaimer.
014: *
015: * 2. Redistributions in binary form must reproduce the above copyright
016: * notice, this list of conditions, and the disclaimer that follows
017: * these conditions in the documentation and/or other materials
018: * provided with the distribution.
019: *
020: * 3. The name "XFlow" must not be used to endorse or promote products
021: * derived from this software without prior written permission. For
022: * written permission, please contact rcktan@yahoo.com
023: *
024: * 4. Products derived from this software may not be called "XFlow", nor
025: * may "XFlow" appear in their name, without prior written permission
026: * from the XFlow Project Management (rcktan@yahoo.com)
027: *
028: * In addition, we request (but do not require) that you include in the
029: * end-user documentation provided with the redistribution and/or in the
030: * software itself an acknowledgement equivalent to the following:
031: * "This product includes software developed by the
032: * XFlow Project (http://xflow.sourceforge.net/)."
033: * Alternatively, the acknowledgment may be graphical using the logos
034: * available at http://xflow.sourceforge.net/
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE XFLOW AUTHORS OR THE PROJECT
040: * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: *
049: * ====================================================================
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the XFlow Project and was originally
052: * created by Rob Tan (rcktan@yahoo.com)
053: * For more information on the XFlow Project, please see:
054: * <http://xflow.sourceforge.net/>.
055: * ====================================================================
056: */
057:
058: package xflow.server.controller;
059:
060: import java.io.*;
061: import java.util.*;
062: import java.lang.*;
063: import java.sql.*;
064:
065: import xflow.common.*;
066: import xflow.protocol.*;
067: import xflow.util.*;
068: import xflow.messaging.*;
069: import xflow.events.*;
070: import xflow.server.util.*;
071:
072: import org.apache.log4j.Logger;
073:
074: public class WorkflowProcessor {
075:
076: private static Logger log = Logger
077: .getLogger(WorkflowProcessor.class);
078: private static HashMap graphsByGraphId = new HashMap();
079: private static HashMap graphsByNameAndVersion = new HashMap();
080: private static HashMap activeWorkflows;
081: private static HashMap suspendedWorkflows;
082: private static EventsPublisher eventsPublisher = new EventsPublisher();
083:
084: static {
085:
086: try {
087: // Load all active workflows
088: activeWorkflows = WorkflowP
089: .getWorkflows("where isActive = true");
090: // Load all suspended workflows
091: suspendedWorkflows = WorkflowP
092: .getWorkflows("where isActive = true and status = 'SUSPENDED'");
093: // Start timeout thread -- disable timeout handling for now
094: //new TimeoutDetector().start();
095: } catch (XflowException e) {
096: log.error("Unable to load active workflows: "
097: + e.getMessage());
098: }
099: }
100:
101: //
102: // Workflow model-oriented services
103: //
104:
105: public static void deployModel(String xml, String type, String user)
106: throws XflowException {
107:
108: DirectedGraph dg = null;
109:
110: if (type.equals("XFLOW")) {
111: try {
112: dg = XflowXMLParser.parse(xml);
113: dg.validate();
114: } catch (Exception e) {
115: throw new XflowException("Failed to parse XML : "
116: + e.getMessage());
117: }
118: try {
119: dg.saveDB();
120: } catch (Exception e) {
121: throw new XflowException(
122: "Failed to save model to database : "
123: + e.getMessage());
124: }
125:
126: String modelName = dg.getName();
127: int graphId = dg.getGraphId();
128: Node startNode = dg.getRootNode();
129: log.info("Saved model: " + modelName);
130: log.info("graphId is: " + graphId);
131: graphsByGraphId.put(new Integer(graphId), dg);
132: String nameVers = dg.getName() + dg.getVersion();
133: graphsByNameAndVersion.put(nameVers, dg);
134:
135: try {
136: eventsPublisher.publishModelDeployedEvent(modelName, dg
137: .getVersion(), user);
138: } catch (XflowException e) {
139: log.warn("Failed to publish event");
140: }
141: } else {
142: throw new XflowException("Type: " + type
143: + " is not supported.");
144: }
145: }
146:
147: public static Vector getModels() throws XflowException {
148: return WorkflowP.getModels();
149: }
150:
151: public static boolean validateProcess(String workflowName,
152: int workflowVersion, String processName)
153: throws XflowException {
154:
155: boolean result = false;
156: DirectedGraph dg = getGraphByNameAndVersion(workflowName,
157: workflowVersion);
158: Node node = dg.getNode(processName);
159: if (node != null) {
160: result = true;
161: }
162:
163: return result;
164: }
165:
166: public static Vector getProcessNodes(WorkflowId wfId)
167: throws XflowException {
168: int workflowId = wfId.getValue();
169: int gid = WorkflowP.getGraphId(workflowId);
170: DirectedGraph dg = getGraphByGraphId(gid);
171: Vector nodes = null;
172: if (dg != null) {
173: nodes = dg.getNodes(Node.PROCESS);
174: } else {
175: throw new XflowException(
176: "Can't find graph for workflow id: " + workflowId);
177: }
178: return nodes;
179: }
180:
181: public static Node getNodeByName(String workflowName,
182: int workflowVersion, String nodeName) throws XflowException {
183: Node node = null;
184: DirectedGraph dg = getGraphByNameAndVersion(workflowName,
185: workflowVersion);
186: if (dg != null) {
187: node = dg.getNode(nodeName);
188: } else {
189: throw new XflowException("Can't find graph for "
190: + workflowName + ", version: " + workflowVersion);
191: }
192: return node;
193: }
194:
195: //
196: // Workflow-oriented services
197: //
198:
199: public static int startWorkflow(String workflowName, int version,
200: WorkItem witem, String initiator) throws XflowException {
201: int wfId = 0;
202: DirectedGraph dg = getGraphByNameAndVersion(workflowName,
203: version);
204:
205: // Save the work item to db
206: log.info("Saving workitem: " + witem);
207: WorkItemP.saveDB(witem);
208:
209: int graphId = dg.getGraphId();
210: Node startNode = dg.getRootNode();
211:
212: log.info("GraphId: " + graphId);
213:
214: wfId = WorkflowP.saveDB(graphId, workflowName, initiator, -1);
215: WorkflowId workflowId = new WorkflowId(wfId);
216: Integer key = new Integer(workflowId.getValue());
217: activeWorkflows.put(key, workflowId);
218: witem.setWorkflowId(new WorkflowId(wfId));
219:
220: log.info("startNode is :" + startNode);
221: transitionFromStartNode(graphId, workflowName, version,
222: startNode, witem);
223:
224: try {
225: int this Version = -1;
226: if (version == -1) {
227: this Version = DirectedGraph
228: .getLatestVersionNumber(workflowName);
229: }
230: eventsPublisher.publishWorkflowStartedEvent(workflowName,
231: this Version, wfId, -1, initiator, witem);
232: } catch (XflowException e) {
233: log.warn("Failed to publish event");
234: }
235:
236: return wfId;
237: }
238:
239: private static int startContaineeWorkflow(String workflowName,
240: int version, WorkItem witem, String initiator,
241: int parentWorkflowId) throws XflowException {
242:
243: int wfId = 0;
244: DirectedGraph dg = getGraphByNameAndVersion(workflowName,
245: version);
246:
247: // First we must clone the work item to be associated with new workflow instance
248: WorkItem clonedWItem = new WorkItem();
249: clonedWItem.setPayload(witem.getPayload());
250: clonedWItem.setPayloadType(witem.getPayloadType());
251: HashMap p = new HashMap();
252: HashMap wp = witem.getProperties();
253: Iterator pItr = wp.keySet().iterator();
254: while (pItr.hasNext()) {
255: String key = (String) pItr.next();
256: Object val = (Object) wp.get(key);
257: p.put(key, val);
258: }
259: clonedWItem.setProperties(p);
260:
261: // Save the cloned work item to db
262: log.info("Saving cloned workitem: " + clonedWItem);
263: WorkItemP.saveDB(clonedWItem);
264: log.info("Cloned workitem: " + clonedWItem);
265:
266: int graphId = dg.getGraphId();
267: Node startNode = dg.getRootNode();
268:
269: wfId = WorkflowP.saveDB(graphId, workflowName, initiator,
270: parentWorkflowId);
271: log.info("Started containee workflow. Workflow Id is: " + wfId);
272:
273: WorkflowId workflowId = new WorkflowId(wfId);
274: Integer key = new Integer(workflowId.getValue());
275: activeWorkflows.put(key, workflowId);
276: clonedWItem.setWorkflowId(new WorkflowId(wfId));
277:
278: log.info("startNode is :" + startNode);
279: transitionFromStartNode(graphId, workflowName, version,
280: startNode, clonedWItem);
281:
282: try {
283: int this Version = -1;
284: if (version == -1) {
285: this Version = DirectedGraph
286: .getLatestVersionNumber(workflowName);
287: }
288: eventsPublisher.publishWorkflowStartedEvent(workflowName,
289: this Version, graphId, parentWorkflowId, initiator,
290: clonedWItem);
291: } catch (XflowException e) {
292: log.warn("Failed to publish event");
293: }
294:
295: return wfId;
296: }
297:
298: public static void abortWorkflow(WorkflowId wfId, String user)
299: throws XflowException {
300:
301: int graphId = WorkflowP.getGraphId(wfId.getValue());
302: DirectedGraph dg = getGraphByGraphId(graphId);
303:
304: Integer key = new Integer(wfId.getValue());
305: activeWorkflows.remove(key);
306: suspendedWorkflows.remove(key);
307: WorkflowP.abortWorkflow(wfId);
308: int workflowId = wfId.getValue();
309: InboxP.removeWorkItems(workflowId);
310: Waiting.removeProcesses(wfId);
311:
312: try {
313: String workflowName = dg.getName();
314: int version = dg.getVersion();
315: eventsPublisher.publishWorkflowAbortedEvent(workflowName,
316: version, workflowId, user);
317: } catch (XflowException e) {
318: log.warn("Failed to publish event");
319: }
320: }
321:
322: public static void suspendWorkflow(WorkflowId wfId)
323: throws XflowException {
324: int graphId = WorkflowP.getGraphId(wfId.getValue());
325: DirectedGraph dg = getGraphByGraphId(graphId);
326:
327: Integer key = new Integer(wfId.getValue());
328: if (suspendedWorkflows.get(key) != null) {
329: throw new XflowException("Workflow is already suspended");
330: }
331: suspendedWorkflows.put(key, wfId);
332: WorkflowP.suspendWorkflow(wfId);
333:
334: try {
335: String workflowName = dg.getName();
336: int version = dg.getVersion();
337: eventsPublisher.publishWorkflowSuspendedEvent(workflowName,
338: version, wfId.getValue(), "system");
339: } catch (XflowException e) {
340: log.warn("Failed to publish event");
341: }
342: }
343:
344: public static void resumeWorkflow(WorkflowId wfId)
345: throws XflowException {
346:
347: int graphId = WorkflowP.getGraphId(wfId.getValue());
348: DirectedGraph dg = getGraphByGraphId(graphId);
349:
350: Integer key = new Integer(wfId.getValue());
351: if (suspendedWorkflows.get(key) == null) {
352: throw new XflowException(
353: "Workflow is not currently suspended");
354: }
355: suspendedWorkflows.remove(key);
356: WorkflowP.resumeWorkflow(wfId);
357:
358: try {
359: String workflowName = dg.getName();
360: int version = dg.getVersion();
361: eventsPublisher.publishWorkflowResumedEvent(workflowName,
362: version, wfId.getValue(), "system");
363: } catch (XflowException e) {
364: log.warn("Failed to publish event");
365: }
366: }
367:
368: public static WorkflowState getWorkflowState(WorkflowId wfId)
369: throws XflowException {
370: return WorkflowP.getWorkflowState(wfId);
371: }
372:
373: public static void setVariable(WorkflowId workflowId, String name,
374: Object value) throws XflowException {
375: int graphId = WorkflowP.getGraphId(workflowId.getValue());
376: DirectedGraph dg = getGraphByGraphId(graphId);
377:
378: int wid = workflowId.getId();
379: Integer key = new Integer(wid);
380: if (activeWorkflows.get(key) == null) {
381: throw new XflowException(
382: "Workflow ID is not active or valid");
383: }
384: WorkflowP.setVariable(workflowId.getValue(), name, value);
385:
386: try {
387: String workflowName = dg.getName();
388: int version = dg.getVersion();
389: eventsPublisher.publishVariableUpdatedEvent(workflowName,
390: version, wid, name, value);
391: } catch (XflowException e) {
392: log.warn("Failed to publish event");
393: }
394: }
395:
396: public static Object getVariable(WorkflowId workflowId, String name)
397: throws XflowException {
398: return WorkflowP.getVariable(workflowId.getValue(), name);
399: }
400:
401: public static Vector getActiveWorkflows() throws XflowException {
402: Vector v = new Vector();
403: WorkflowState ws = null;
404: HashMap wflowIds = WorkflowP
405: .getWorkflows("where isActive = true");
406: Iterator itr = wflowIds.values().iterator();
407: while (itr.hasNext()) {
408: WorkflowId wfId = (WorkflowId) itr.next();
409: ws = getWorkflowState(wfId);
410: v.addElement(ws);
411: }
412: return v;
413: }
414:
415: public static Vector getAllWorkflows() throws XflowException {
416: Vector v = new Vector();
417: WorkflowState ws = null;
418: HashMap wflowIds = WorkflowP.getWorkflows("");
419: Iterator itr = wflowIds.values().iterator();
420: while (itr.hasNext()) {
421: WorkflowId wfId = (WorkflowId) itr.next();
422: ws = getWorkflowState(wfId);
423: v.addElement(ws);
424: }
425: return v;
426: }
427:
428: public static Vector getWorkflowsByName(String name)
429: throws XflowException {
430: Vector v = new Vector();
431: WorkflowState ws = null;
432: HashMap wflowIds = WorkflowP.getWorkflowsByName(name);
433: Iterator itr = wflowIds.values().iterator();
434: while (itr.hasNext()) {
435: WorkflowId wfId = (WorkflowId) itr.next();
436: ws = getWorkflowState(wfId);
437: v.addElement(ws);
438: }
439: return v;
440: }
441:
442: //
443: // Process-oriented services
444: //
445:
446: public static void completeWorkItem(String workflowName,
447: int workflowVersion, String processName, WorkItem witem)
448: throws XflowException {
449:
450: // Begin Validation
451: log.info("In CompleteWorkItem.");
452: log.info("workflowName: " + workflowName);
453: log.info("processName: " + processName);
454:
455: log.info(" Validating Work Item: " + witem);
456:
457: WorkItemId wid = witem.getId();
458: if (wid == null) {
459: throw new XflowException(
460: "Cannot complete work item. Null workitem Id");
461: }
462:
463: WorkflowId wfId = witem.getWorkflowId();
464: if (wfId == null) {
465: throw new XflowException(
466: "Cannot complete work item. Null workflow Id");
467: }
468:
469: int workflowId = wfId.getId();
470: Integer key = new Integer(workflowId);
471:
472: if (activeWorkflows.get(key) == null) {
473: throw new XflowException(
474: "Cannot complete work item. Workflow instance is not active");
475: }
476:
477: if (suspendedWorkflows.get(key) != null) {
478: throw new XflowException(
479: "Cannot complete work item. Workflow instance has been suspended");
480: }
481:
482: int gid = -1;
483: try {
484: gid = DirectedGraph.getGraphId(workflowName,
485: workflowVersion);
486: } catch (Exception e) {
487: throw new XflowException(e.getMessage());
488: }
489:
490: if (!InboxP.isWorkItemValid(gid, processName, witem)) {
491: throw new XflowException(
492: "Cannot complete work item. Invalid work item state.");
493: }
494:
495: log.info("workflow ID: " + workflowId);
496:
497: // End Validation
498:
499: log
500: .info("Work Item passed validation. Now attempting to complete work item");
501:
502: DirectedGraph dg = getGraphByGraphId(gid);
503: Node rootNode = dg.getRootNode();
504: Node this Node = rootNode.getNode(processName);
505:
506: transitionFrom(gid, workflowName, workflowVersion, this Node,
507: witem);
508:
509: }
510:
511: public static Vector getWorkItems(String wfName, String procName)
512: throws XflowException {
513: return WorkItemP.getWorkItems(wfName, procName);
514: }
515:
516: public static WorkItem getNextWorkItem(String wfName,
517: String procName) throws XflowException {
518: return WorkItemP.getNextWorkItem(wfName, procName);
519: }
520:
521: public static WorkItem getWorkItem(WorkItemId wid, String procName)
522: throws XflowException {
523: return WorkItemP.getWorkItem(wid, procName);
524: }
525:
526: //
527: // Auxiliary Methods
528: //
529:
530: private static DirectedGraph getGraphByGraphId(int gid)
531: throws XflowException {
532: DirectedGraph dg = (DirectedGraph) graphsByGraphId
533: .get(new Integer(gid));
534: if (dg == null) {
535: try {
536: dg = DirectedGraph.loadByGraphId(gid);
537: graphsByGraphId.put(new Integer(gid), dg);
538: String nameVers = dg.getName() + dg.getVersion();
539: graphsByNameAndVersion.put(nameVers, dg);
540: } catch (Exception e) {
541: throw new XflowException(e.getMessage());
542: }
543: }
544: return dg;
545: }
546:
547: private static DirectedGraph getGraphByNameAndVersion(String name,
548: int version) throws XflowException {
549: DirectedGraph dg = (DirectedGraph) graphsByNameAndVersion
550: .get(name + version);
551: if (dg == null) {
552: log.info("Loading workflow: " + name + " " + version);
553: dg = new DirectedGraph(name, version);
554: try {
555: dg.loadDB();
556: } catch (Throwable c) {
557: throw new XflowException(
558: "Failed to load workflow from database.");
559: }
560: int gid = dg.getGraphId();
561: graphsByGraphId.put(new Integer(gid), dg);
562: String nameVers = dg.getName() + dg.getVersion();
563: graphsByNameAndVersion.put(nameVers, dg);
564: }
565: return dg;
566: }
567:
568: private static void processContainer(int gid, Node containerNode,
569: WorkItem witem) throws XflowException {
570:
571: log.info("in processContainer");
572:
573: WorkflowId wfId = witem.getWorkflowId();
574: int workflowId = wfId.getId();
575: String containee = containerNode.getContainee();
576: int containeeVersion = containerNode.getContaineeVersion();
577:
578: log.info("containee name: " + containee);
579: log.info("containee version: " + containeeVersion);
580:
581: DirectedGraph dg = getGraphByNameAndVersion(containee,
582: containeeVersion);
583: int containeeGid = dg.getGraphId();
584:
585: log.info("Successfully loaded graph");
586:
587: int graphId = dg.getGraphId();
588: Node startNode = dg.getRootNode();
589: Node endNode = dg.getEndNode();
590:
591: log.info("graphId: " + graphId);
592: log.info("startNode: " + endNode);
593: log.info("endNode: " + endNode);
594:
595: // If container does not have any destinations -- we spawn a new workflow thread
596: if (containerNode.getDestinations().size() == 0) {
597: log.info("Starting containee workflow: " + containee
598: + " version: " + containeeVersion);
599: startContaineeWorkflow(containee, containeeVersion, witem,
600: "System", workflowId);
601: } else {
602: // Otherwise, execute containee workflow in the current thread
603:
604: // Push to process stack
605: ProcessStack.push(workflowId, gid, containerNode, endNode);
606:
607: // Transition from start node
608: transitionFromStartNode(containeeGid, containee,
609: containeeVersion, startNode, witem);
610: }
611: }
612:
613: private static boolean evaluateRule(WorkItem witem, String rule)
614: throws XflowException {
615:
616: boolean result = true;
617: if (rule != null && !rule.equals("") && !rule.equals("always")) {
618: log.info("Evaluating rule: " + rule);
619: if (RuleEngine.evaluate(witem, rule) == false) {
620: result = false;
621: }
622: log.info("Rule: " + rule + " evaluated to: " + result);
623: }
624: return result;
625: }
626:
627: private static void transitionFrom(int gid, String workflowName,
628: int workflowVersion, Node fromNode, WorkItem witem)
629: throws XflowException {
630:
631: Vector destv = fromNode.getDestinations();
632: String processName = fromNode.getNodeName();
633: WorkflowId wfId = witem.getWorkflowId();
634: int workflowId = wfId.getId();
635:
636: log.info("Transitioning from: " + fromNode.getNodeId() + " "
637: + fromNode.getNodeName());
638: log.info("From node has: " + destv.size() + " destinations");
639:
640: // Place workitem in destination nodes inbox
641: for (int i = 0; i < destv.size(); i++) {
642: xflow.common.Destination dest = (xflow.common.Destination) destv
643: .elementAt(i);
644:
645: log.info("Processing destination " + i);
646:
647: // Evaluate rule
648: if (!evaluateRule(witem, dest.rule)) {
649: log
650: .info("This destination's rule evaluated to false. Not going there");
651: continue;
652: }
653:
654: Node node = dest.node;
655: String nodeType = node.getNodeType();
656:
657: log.info("This destination node is: " + node.getNodeId()
658: + " " + node.getNodeName());
659: log.info("This destination node type is: "
660: + node.getNodeType());
661:
662: // End nodes don't have inboxes -- so just remove from previous inbox and mark workflow
663: // as completed.
664: if (nodeType.equals(Node.END)) {
665:
666: log.info("Processing END node");
667: InboxP.removeWorkItem(gid, processName, witem);
668:
669: // Unwind if necessary
670: PopNode popNode = ProcessStack.pop(workflowId, node);
671:
672: // Mark workflow ended only if we didn't unwind.
673: if (popNode == null) {
674: Integer key = new Integer(workflowId);
675: activeWorkflows.remove(key);
676: WorkflowP.setCompleted(workflowId);
677:
678: try {
679:
680: int this Version = -1;
681: if (workflowVersion == -1) {
682: this Version = DirectedGraph
683: .getLatestVersionNumber(workflowName);
684: }
685: // Publish workflow completed event
686: eventsPublisher.publishWorkflowCompletedEvent(
687: workflowName, this Version, workflowId,
688: "system");
689: } catch (XflowException e) {
690: log.warn("Failed to publish event");
691: }
692: } else {
693: log
694: .info("Transitioning to unwoundNode's destinations.");
695:
696: // Transition to popped node's destination(s)
697: int cGid = popNode.gid;
698: int cNodeId = popNode.nodeId;
699: log
700: .info("cGid = " + cGid + " cNodeId = "
701: + cNodeId);
702: DirectedGraph dg = getGraphByGraphId(cGid);
703: log.info("Got graph");
704: Node cNode = dg.getNode(cNodeId);
705: log.info("cNode = " + cNode);
706: String cWorkflowName = dg.getName();
707: int cVersion = dg.getVersion();
708: log.info("cWorkflowName = " + cWorkflowName
709: + " Version = " + cVersion);
710: transitionFrom(cGid, cWorkflowName, cVersion,
711: cNode, witem);
712: }
713: continue;
714: }
715:
716: String nextProcessName = node.getNodeName();
717: String nextProcessType = node.getNodeType();
718:
719: // Handle Or node
720: if (nextProcessType.equals(Node.OR)) {
721:
722: log.info("Processing OR node");
723: Vector orDest = node.getDestinations();
724: Node orDestNode = ((xflow.common.Destination) orDest
725: .elementAt(0)).node;
726: nextProcessName = orDestNode.getNodeName();
727: nextProcessType = orDestNode.getNodeType();
728: if (nextProcessType.equals(Node.PROCESS)) {
729: log.info("Transitioning to: " + nextProcessName);
730: transitionTo(gid, workflowName, workflowId,
731: workflowVersion, processName,
732: nextProcessName, witem);
733: }
734: // Handle And node
735: } else if (nextProcessType.equals(Node.AND)) {
736: log.info("Processing AND node");
737: int destNodeId = node.getNodeId();
738: int fromNodeId = fromNode.getNodeId();
739: Waiting.addProcess(wfId, destNodeId, fromNodeId);
740: InboxP.removeWorkItem(gid, processName, witem);
741:
742: Vector fromNodes = node.getFromNodes();
743: if (Waiting.allProcessesArrived(fromNodes, wfId,
744: destNodeId)) {
745: log.info("Waiting on AND node completed.");
746: Waiting.removeProcesses(wfId);
747: transitionFrom(gid, workflowName, workflowVersion,
748: node, witem);
749: }
750: } else if (nextProcessType.equals(Node.PROCESS)) {
751: log.info("Transitioning to: " + nextProcessName);
752: transitionTo(gid, workflowName, workflowId,
753: workflowVersion, processName, nextProcessName,
754: witem);
755: } else if (nextProcessType.equals(Node.CONTAINER)) {
756: // This is a container for another workflow
757: log.info("Processing CONTAINER node");
758: processContainer(gid, node, witem);
759: }
760: }
761: }
762:
763: private static void transitionFromStartNode(int gid,
764: String workflowName, int workflowVersion, Node startNode,
765: WorkItem witem) throws XflowException {
766:
767: // Get the destinations from start node
768: Vector destv = startNode.getDestinations();
769:
770: // Place workitem in destination nodes inbox
771: for (int i = 0; i < destv.size(); i++) {
772: xflow.common.Destination dest = (xflow.common.Destination) destv
773: .elementAt(i);
774:
775: // Evaluate rule
776: if (!evaluateRule(witem, dest.rule)) {
777: continue;
778: }
779:
780: Node node = dest.node;
781: String nodeType = node.getNodeType();
782:
783: // End nodes don't have inboxes -- so ignore
784: if (nodeType.equals(Node.END)) {
785: continue;
786: }
787: String procName = node.getNodeName();
788: log.info("Adding workitem to inbox for proc: " + procName);
789:
790: // Add the work item to the process's inbox
791: InboxP.addWorkItem(gid, workflowName, procName, witem);
792:
793: // Send notification to destination
794: log.info("Transition From Start Node");
795: log.info("Sending inbox notification:");
796: log.info(" workflowName: " + workflowName);
797: log.info(" procName: " + procName);
798: log.info(" witem: " + witem.getId());
799: sendInboxNotification(workflowName, procName, witem);
800: }
801: }
802:
803: private static void transitionTo(int gid, String workflowName,
804: int workflowId, int workflowVersion, String processName,
805: String nextProcessName, WorkItem witem)
806: throws XflowException {
807:
808: InboxP.removeWorkItem(gid, processName, witem);
809: InboxP.addWorkItem(gid, workflowName, nextProcessName, witem);
810:
811: log.info("TransitionTo");
812: log.info("Sending inbox notification:");
813: log.info(" workflowName: " + workflowName);
814: log.info(" procName: " + nextProcessName);
815: log.info(" witem: " + witem.getId());
816:
817: // Send notification to destination
818: sendInboxNotification(workflowName, nextProcessName, witem);
819:
820: try {
821: int this Version = -1;
822: if (workflowVersion == -1) {
823: this Version = DirectedGraph
824: .getLatestVersionNumber(workflowName);
825: }
826: eventsPublisher.publishNodeTransitionEvent(workflowName,
827: this Version, workflowId, processName,
828: nextProcessName, witem);
829: } catch (XflowException e) {
830: log.warn("Failed to publish event");
831: }
832:
833: }
834:
835: private static void sendInboxNotification(String workflowName,
836: String procName, WorkItem witem) {
837: try {
838: ByteArrayOutputStream out = new ByteArrayOutputStream();
839: ObjectOutputStream s = new ObjectOutputStream(out);
840: s.writeObject(witem);
841: s.flush();
842: byte[] barr = out.toByteArray();
843:
844: Vector props = new Vector();
845: MessageProperty mp = new MessageProperty();
846: mp.name = "ProcessName";
847: mp.value = workflowName + procName;
848: props.addElement(mp);
849: JMSPublisher.send(XflowConstants.XFLOW_TOPIC, barr, props);
850: } catch (Exception e) {
851: e.printStackTrace();
852: }
853: }
854:
855: }
|