0001: /*--
0002:
0003: Copyright (C) 2002-2005 Adrian Price.
0004: All rights reserved.
0005:
0006: Redistribution and use in source and binary forms, with or without
0007: modification, are permitted provided that the following conditions
0008: are met:
0009:
0010: 1. Redistributions of source code must retain the above copyright
0011: notice, this list of conditions, and the following disclaimer.
0012:
0013: 2. Redistributions in binary form must reproduce the above copyright
0014: notice, this list of conditions, and the disclaimer that follows
0015: these conditions in the documentation and/or other materials
0016: provided with the distribution.
0017:
0018: 3. The names "OBE" and "Open Business Engine" must not be used to
0019: endorse or promote products derived from this software without prior
0020: written permission. For written permission, please contact
0021: adrianprice@sourceforge.net.
0022:
0023: 4. Products derived from this software may not be called "OBE" or
0024: "Open Business Engine", nor may "OBE" or "Open Business Engine"
0025: appear in their name, without prior written permission from
0026: Adrian Price (adrianprice@users.sourceforge.net).
0027:
0028: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
0029: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
0030: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
0031: DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT,
0032: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
0033: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
0034: SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
0035: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
0036: STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
0037: IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0038: POSSIBILITY OF SUCH DAMAGE.
0039:
0040: For more information on OBE, please see
0041: <http://obe.sourceforge.net/>.
0042:
0043: */
0044:
0045: package org.obe.engine;
0046:
0047: import org.apache.commons.logging.Log;
0048: import org.apache.commons.logging.LogFactory;
0049: import org.obe.OBERuntimeException;
0050: import org.obe.client.api.repository.EventTypeMetaData;
0051: import org.obe.client.api.repository.ObjectNotFoundException;
0052: import org.obe.client.api.repository.RepositoryException;
0053: import org.obe.client.api.tool.Parameter;
0054: import org.obe.client.api.tool.ToolAgent;
0055: import org.obe.client.api.tool.ToolInvocation;
0056: import org.obe.engine.util.PersistentArrayIterator;
0057: import org.obe.spi.evaluator.Evaluator;
0058: import org.obe.spi.evaluator.EvaluatorException;
0059: import org.obe.spi.model.*;
0060: import org.obe.spi.service.*;
0061: import org.obe.util.WorkflowUtilities;
0062: import org.obe.xpdl.model.activity.*;
0063: import org.obe.xpdl.model.condition.Condition;
0064: import org.obe.xpdl.model.condition.ConditionType;
0065: import org.obe.xpdl.model.data.ActualParameter;
0066: import org.obe.xpdl.model.data.FormalParameter;
0067: import org.obe.xpdl.model.data.ParameterMode;
0068: import org.obe.xpdl.model.data.Type;
0069: import org.obe.xpdl.model.ext.*;
0070: import org.obe.xpdl.model.misc.ConformanceClass;
0071: import org.obe.xpdl.model.misc.Deadline;
0072: import org.obe.xpdl.model.misc.ExecutionType;
0073: import org.obe.xpdl.model.misc.GraphConformance;
0074: import org.obe.xpdl.model.participant.Participant;
0075: import org.obe.xpdl.model.participant.ParticipantType;
0076: import org.obe.xpdl.model.transition.*;
0077: import org.obe.xpdl.model.workflow.WorkflowProcess;
0078: import org.wfmc.wapi.*;
0079:
0080: import java.util.*;
0081:
0082: /**
0083: * Implements the workflow enactment logic.
0084: *
0085: * @author Anthony Eden
0086: * @author Adrian Price
0087: */
0088: final class WorkflowRunner {
0089: private static final Log _logger = LogFactory
0090: .getLog(WorkflowRunner.class);
0091: private static final String[] EMPTY_PARTICPANTS = {};
0092:
0093: private final EngineContext _ctx;
0094: private final ServiceManager _svcMgr;
0095: // A queue of activities awaiting execution.
0096: private final Stack _activityStack = new Stack();
0097: // Flag that indicates whether the run() method is running.
0098: private boolean _running;
0099: // Cache the evaluator in case of re-use.
0100: private Evaluator _evaluator;
0101:
0102: // This class is used to enqueue activities for execution by the main loop.
0103: private static final class ActivityContext {
0104: private final Activity activity;
0105: private final ActivityInstance instance;
0106:
0107: ActivityContext(Activity activity, ActivityInstance instance) {
0108: this .activity = activity;
0109: this .instance = instance;
0110: }
0111: }
0112:
0113: WorkflowRunner(ServiceManager svcMgr, EngineContext ctx) {
0114: _svcMgr = svcMgr;
0115: _ctx = ctx;
0116: }
0117:
0118: /**
0119: * Sets completed state, clears event watches, executes transitions.
0120: */
0121: void completeActivity(Activity activity,
0122: ActivityInstance activityInstance, boolean run)
0123: throws WMWorkflowException {
0124:
0125: if (_logger.isDebugEnabled()) {
0126: _logger.debug("Completing activity '" + activity.getId()
0127: + "' instance "
0128: + activityInstance.getActivityInstanceId()
0129: + " in process '" + _ctx.getWorkflow().getId()
0130: + "' instance "
0131: + _ctx.getProcessInstance().getProcessInstanceId());
0132: }
0133:
0134: // Activity is now complete.
0135: activityInstance.setCompletedDate(new Date());
0136:
0137: // Make sure the process instance activity dates match.
0138: _ctx.getEngine().recomputeDates(_ctx.getProcessInstance(),
0139: activityInstance);
0140:
0141: // For NON_BLOCKED workflows we must reset the join state now, because
0142: // otherwise the join will not fire a second time, thus preventing
0143: // cycles from occurring. In LOOP_BLOCKED and FULL_BLOCKED modes the
0144: // join reset is unnecessary because the loop controller automatically
0145: // resets all joins within the loop before commencing a new iteration.
0146: ConformanceClass conformanceClass = _ctx.getWorkflow()
0147: .getPackage().getConformanceClass();
0148: GraphConformance graphConformance = conformanceClass == null ? null
0149: : conformanceClass.getGraphConformance();
0150: if (graphConformance == null
0151: || graphConformance == GraphConformance.NON_BLOCKED) {
0152:
0153: JoinInstance join = activityInstance.getJoin();
0154: if (join != null)
0155: join.reset();
0156: }
0157:
0158: // Set the activity status to closed.completed (also notifies activity
0159: // instance listeners). Open work items are terminated.
0160: _ctx.getEngine().cascadeActivityInstanceState(
0161: _ctx.getWorkflow(), activity, activityInstance,
0162: WMActivityInstanceState.CLOSED_COMPLETED_INT, true,
0163: WMWorkItemState.CLOSED_TERMINATED_INT, false, false);
0164:
0165: // Unsubscribe the activity instance from all application events.
0166: try {
0167: _svcMgr.getApplicationEventBroker().unsubscribe(
0168: new String[] {
0169: activityInstance.getProcessDefinitionId(),
0170: activityInstance.getProcessInstanceId(),
0171: activityInstance.getActivityInstanceId() },
0172: false);
0173: } catch (RepositoryException e) {
0174: throw new WMWorkflowException(e);
0175: }
0176:
0177: // If an exit activity, find its encompassing block activity and
0178: // determine if it is complete. If it is not complete (as may be the
0179: // case for a looping block activity), enqueue it for re-execution.
0180: // If the encompassing block activity is complete, recursively mark it
0181: // so. If there is no encompassing block activity, the workflow is
0182: // complete. If not an exit activity, fire its efferent transitions.
0183: if (activity.isExitActivity()) {
0184: // If this activity belongs to an activity set.
0185: String outerActivityInstId = activityInstance
0186: .getBlockActivityInstanceId();
0187: if (outerActivityInstId == null) {
0188: // No outer block - must be the end of the workflow.
0189: completeProcess();
0190: } else {
0191: // Find outer block activity definition and instance.
0192: activityInstance = findActivityInstance(outerActivityInstId);
0193: String outerActivityDefId = activityInstance
0194: .getActivityDefinitionId();
0195: activity = WorkflowUtilities.findActivity(_ctx
0196: .getWorkflow(), outerActivityDefId);
0197:
0198: // If the outer block has a loop, we must determine whether
0199: // another iteration is required.
0200: if (isBlockActivityComplete(activity, activityInstance)) {
0201: // The block activity is now complete, mark it so.
0202: completeActivity(activity, activityInstance, false);
0203: } else {
0204: // Otherwise, enqueue it for re-execution.
0205: pushActivity(activity, activityInstance);
0206: }
0207: }
0208: } else {
0209: // This isn't an exit activity, so fire the efferent transitions.
0210: executeTransitions(activity, activityInstance);
0211: }
0212:
0213: // Make sure the activities get executed.
0214: if (run)
0215: run();
0216: }
0217:
0218: void completeProcess() throws WMWorkflowException {
0219: ProcessInstance processInstance = _ctx.getProcessInstance();
0220:
0221: if (_logger.isDebugEnabled()) {
0222: _logger.debug("Completing process '"
0223: + _ctx.getWorkflow().getId() + "' instance "
0224: + processInstance.getProcessInstanceId());
0225: }
0226:
0227: // Unsubscribe the process instance from all application events.
0228: try {
0229: _svcMgr.getApplicationEventBroker().unsubscribe(
0230: new String[] {
0231: processInstance.getProcessDefinitionId(),
0232: processInstance.getProcessInstanceId() },
0233: false);
0234: } catch (RepositoryException e) {
0235: throw new WMWorkflowException(e);
0236: }
0237:
0238: // Set the process instance status to closed.completed
0239: // (also notifies process instance listeners).
0240: processInstance.setCompletedDate(new Date());
0241: _ctx.getEngine().cascadeProcessInstanceState(
0242: _ctx.getWorkflow(), processInstance,
0243: WMProcessInstanceState.CLOSED_COMPLETED_INT, true,
0244: WMObjectState.DEFAULT_INT, false,
0245: WMObjectState.DEFAULT_INT, false, false);
0246:
0247: // Remove any enqueued activities from the execution stack.
0248: _activityStack.clear();
0249:
0250: // If there's a parent process, we must return the output results to it.
0251: ActivityInstance parentActivityInstance = processInstance
0252: .getParentActivityInstance();
0253: if (parentActivityInstance != null)
0254: completeParentActivity(parentActivityInstance);
0255: }
0256:
0257: private void completeParentActivity(
0258: ActivityInstance parentActivityInstance)
0259: throws WMWorkflowException {
0260:
0261: ProcessInstance parentProcessInstance = parentActivityInstance
0262: .getProcessInstance();
0263: String parentProcessDefinitionId = parentProcessInstance
0264: .getProcessDefinitionId();
0265: String parentProcessInstanceId = parentActivityInstance
0266: .getProcessInstanceId();
0267:
0268: // We need the definition of the parent activity so we can return the
0269: // output parameters.
0270: WorkflowProcess parentWorkflow;
0271: Activity parentActivity;
0272: EngineContext parentCtx = EngineContext
0273: .peekContext(parentProcessInstanceId);
0274: if (parentCtx == null) {
0275: ProcessRepository processRepository = _svcMgr
0276: .getProcessRepository();
0277: try {
0278: parentWorkflow = processRepository
0279: .findWorkflowProcess(parentProcessDefinitionId);
0280: parentActivity = WorkflowUtilities.findActivity(
0281: parentWorkflow, parentActivityInstance
0282: .getActivityDefinitionId());
0283: } catch (ObjectNotFoundException e) {
0284: throw new WMInvalidProcessDefinitionException(
0285: parentProcessDefinitionId);
0286: } catch (RepositoryException e) {
0287: throw new WMWorkflowException(e);
0288: }
0289: } else {
0290: parentWorkflow = parentCtx.getWorkflow();
0291: parentActivity = parentCtx.getActivity();
0292: }
0293: SubFlow subFlow = (SubFlow) parentActivity.getImplementation();
0294:
0295: // Copy the output parameters to the parent process.
0296: ProcessInstance processInstance = _ctx.getProcessInstance();
0297: FormalParameter[] formalParms = _ctx.getWorkflow()
0298: .getFormalParameter();
0299: ActualParameter[] actualParms = subFlow.getActualParameter();
0300: for (int i = 0, n = formalParms.length; i < n; i++) {
0301: FormalParameter fp = formalParms[i];
0302: ActualParameter ap = actualParms[i];
0303: ParameterMode mode = fp.getMode();
0304: if (mode != null && mode != ParameterMode.IN) {
0305: String formalName = fp.getId();
0306: String actualName = ap.getText();
0307: try {
0308: // The formal parameter name is the name of the process
0309: // instance variable in the subflow.
0310: Object result = processInstance
0311: .getAttributeInstance(formalName)
0312: .getValue();
0313:
0314: // The text value of an INOUT or OUT actual parameter is the
0315: // name of the workflow variable to set with the result.
0316: parentProcessInstance.getAttributeInstance(
0317: actualName).setValue(Type.DEFAULT_TYPE,
0318: result);
0319: } catch (ObjectNotFoundException e) {
0320: throw new WMInvalidAttributeException(formalName);
0321: } catch (AttributeReadOnlyException e) {
0322: throw new WMInvalidAttributeException(actualName);
0323: } catch (RepositoryException e) {
0324: throw new WMWorkflowException(e);
0325: }
0326: }
0327: }
0328:
0329: // One way or the other, if the subflow was invoked synchronously,
0330: // the calling activity must be marked complete to allow the parent
0331: // process to proceed. If the parent instance is still on the call
0332: // stack we need take no further action here, because the parent
0333: // will mark the activity complete after this call returns. However,
0334: // if the subflow was invoked synchronously but hit a quiescent
0335: // point before completing, we must resume execution of the parent
0336: // here. The calling activity will only be on the call stack in
0337: // the case of a synchronous straight-through subflow.
0338: EngineContext ctx = null;
0339: try {
0340: if (parentCtx == null) {
0341: // If the parent invoked the subflow synchronously, we must
0342: // now mark the calling activity as complete and resume the
0343: // calling process instance. The WfMC spec. says that in
0344: // async mode the calling activity runs straight through.
0345: if (subFlow.getExecution() != ExecutionType.ASYNCHRONOUS) {
0346: ctx = EngineContext.pushContext(_ctx.getEngine(),
0347: parentWorkflow, parentProcessInstance,
0348: parentActivity, parentActivityInstance,
0349: null, null);
0350: WorkflowRunner runner = new WorkflowRunner(_svcMgr,
0351: ctx);
0352:
0353: // The runner must execute the parent activity immediately,
0354: // because the parent context isn't on the JVM call stack.
0355: runner.completeActivity(parentActivity,
0356: parentActivityInstance, true);
0357: }
0358: }
0359: } finally {
0360: // Restore the subflow's context before returning.
0361: if (ctx != null)
0362: EngineContext.popContext();
0363: }
0364: }
0365:
0366: // Executes a single activity instance then returns.
0367: // N.B. This method must ONLY be called by the main run() loop.
0368: private void executeActivityInstance(Activity activity,
0369: ActivityInstance activityInstance)
0370: throws WMWorkflowException {
0371:
0372: boolean completed;
0373:
0374: // Execute the activity implementation or activity set.
0375: if (activity.getImplementation() == null) {
0376: BlockActivity innerBlkActivity = activity
0377: .getBlockActivity();
0378: if (innerBlkActivity == null) {
0379: Route route = activity.getRoute();
0380: if (route == null) {
0381: // TODO: perform a validating parse.
0382: // Shouldn't happen if parser validates against the schema.
0383: throw new WMInvalidProcessDefinitionException(
0384: "Programming error: activity '"
0385: + activity.getId()
0386: + "' has no implementation in workflow '"
0387: + _ctx.getWorkflow().getId() + '\'');
0388: }
0389: completed = true;
0390: } else {
0391: // We can't always tell at this point whether the block activity
0392: // will run straight to completion. When the activity set's
0393: // exit activity completes, it will either re-enqueue the block
0394: // activity if it contains a loop implementation, or it will
0395: // mark the block activity complete. In cases where the block
0396: // activity finds no work to do it signals immediate completion.
0397: completed = executeBlockActivity(activity,
0398: activityInstance);
0399: }
0400: } else {
0401: completed = executeImplementation(activity,
0402: activityInstance);
0403: }
0404:
0405: // If an auto-finish activity ran to completion, mark it complete and
0406: // thereby let the workflow proceed. Otherwise, we'll have to let the
0407: // client mark the activity complete.
0408: if (completed
0409: && activity.getFinishMode() != AutomationMode.MANUAL) {
0410: completeActivity(activity, activityInstance, false);
0411: }
0412: }
0413:
0414: // Determines whether to perform an(other) iteration of the activity set,
0415: // and if so, enqueues the relevant activities for execution.
0416: private boolean executeBlockActivity(Activity activity,
0417: ActivityInstance blockActivityInstance)
0418: throws WMWorkflowException {
0419:
0420: ActivitySet activitySet = activity.getBlockActivity()
0421: .getActivitySet();
0422:
0423: if (_logger.isDebugEnabled()) {
0424: _logger.debug("Executing activity set: "
0425: + activitySet.getId());
0426: }
0427:
0428: // Implement explicit iteration support here. Theoretically we could
0429: // make any activity loop by adding an obe:Loop extended attribute, but
0430: // our hope is that the WfMC will provide support for looping via the
0431: // same elements nested in BlockActivity (in the xpdl namespace). That
0432: // was the approach I (AMP) discussed with Roberta Norin, the WfMC WG1
0433: // Interface 1 XPDL Spec. Editor.
0434: int loopType = LoopBody.NONE;
0435: LoopBody body = null;
0436: Loop loop = activity.getBlockActivity().getLoop();
0437: if (loop != null) {
0438: body = loop.getBody();
0439: loopType = body.getType();
0440: }
0441:
0442: boolean completed = false;
0443: switch (loopType) {
0444: case LoopBody.NONE:
0445: pushStartActivities(activitySet.getActivity(),
0446: blockActivityInstance.getActivityInstanceId());
0447: break;
0448: case LoopBody.FOR_EACH:
0449: executeForEach(activitySet, blockActivityInstance,
0450: (ForEach) body);
0451: break;
0452: case LoopBody.UNTIL:
0453: pushStartActivities(activitySet.getActivity(),
0454: blockActivityInstance.getActivityInstanceId());
0455: break;
0456: case LoopBody.WHILE:
0457: completed = executeWhile(activitySet,
0458: blockActivityInstance, (While) body);
0459: break;
0460: default:
0461: throw new WMInvalidProcessDefinitionException(
0462: "Programming error: invalid loop type");
0463: }
0464: return completed;
0465: }
0466:
0467: private void executeForEach(ActivitySet activitySet,
0468: ActivityInstance blockActivityInstance, ForEach body)
0469: throws WMWorkflowException {
0470:
0471: try {
0472: // Retrieve the persistent iterator for this block activity.
0473: PersistentIterator iter = blockActivityInstance
0474: .getBlockActivityIterator();
0475:
0476: // If the iterator has lost its content through having been
0477: // serialized, refresh it. If the workflow hasn't quiesced, or if
0478: // a non-persistent, in-memory instance repository is in use, this
0479: // will never happen while a block activity is executing, even if
0480: // there are quiescent breaks in the flow.
0481: if (!iter.hasContent()) {
0482: // Re-evaluate the scope expression, convert the result to an
0483: // array, and set the array into the iterator.
0484: String expr = body.getExpr();
0485: Object result = getEvaluator().evaluateExpression(expr,
0486: _ctx);
0487: iter.setContent(_svcMgr.getDataConverter().toArray(
0488: result));
0489: }
0490:
0491: // If there's more to do, retrieve the next element of the
0492: // iteration, and post the start activities of this block for
0493: // another execution cycle.
0494: Object attrValue = null;
0495: if (iter.hasNext()) {
0496: attrValue = iter.next();
0497: pushStartActivities(activitySet.getActivity(),
0498: blockActivityInstance.getActivityInstanceId());
0499: }
0500:
0501: // Set the value of the iterator process attribute.
0502: AttributeInstance attr = _ctx.getProcessInstance()
0503: .getAttributeInstance(body.getDataField());
0504: attr.setValue(Type.DEFAULT_TYPE, attrValue);
0505: } catch (AttributeReadOnlyException e) {
0506: throw new WMWorkflowException(e);
0507: } catch (EvaluatorException e) {
0508: throw new WMWorkflowException(e);
0509: } catch (RepositoryException e) {
0510: throw new WMWorkflowException(e);
0511: }
0512: }
0513:
0514: private boolean continueUntil(Until body)
0515: throws WMWorkflowException {
0516: try {
0517: Evaluator evaluator = getEvaluator();
0518: Condition cond = body.getCondition();
0519: return evaluator.evaluateCondition(cond.getValue(), _ctx);
0520: } catch (EvaluatorException e) {
0521: throw new WMWorkflowException(e);
0522: }
0523: }
0524:
0525: private boolean continueWhile(While body)
0526: throws WMWorkflowException {
0527: try {
0528: Evaluator evaluator = getEvaluator();
0529: Condition cond = body.getCondition();
0530: return evaluator.evaluateCondition(cond.getValue(), _ctx);
0531: } catch (EvaluatorException e) {
0532: throw new WMWorkflowException(e);
0533: }
0534: }
0535:
0536: private boolean executeWhile(ActivitySet activitySet,
0537: ActivityInstance blockActivityInstance, While body)
0538: throws WMWorkflowException {
0539:
0540: boolean completed;
0541: try {
0542: Evaluator evaluator = getEvaluator();
0543: Condition cond = body.getCondition();
0544: if (evaluator.evaluateCondition(cond.getValue(), _ctx)) {
0545: pushStartActivities(activitySet.getActivity(),
0546: blockActivityInstance.getActivityInstanceId());
0547: completed = false;
0548: } else {
0549: completed = true;
0550: }
0551: } catch (EvaluatorException e) {
0552: throw new WMWorkflowException(e);
0553: }
0554: return completed;
0555: }
0556:
0557: private boolean executeImplementation(Activity activity,
0558: ActivityInstance activityInstance)
0559: throws WMWorkflowException {
0560:
0561: if (_logger.isDebugEnabled()) {
0562: _logger.debug("Executing activity '" + activity.getId()
0563: + "' implementation for instance "
0564: + activityInstance.getActivityInstanceId());
0565: }
0566:
0567: boolean completed;
0568: try {
0569: Implementation implementation = activity
0570: .getImplementation();
0571:
0572: // Subflows are always started by their parent activity, regardless
0573: // of whether the activity is assigned to a performer.
0574: if (implementation instanceof SubFlow) {
0575: completed = executeSubFlow(activity, activityInstance,
0576: (SubFlow) implementation);
0577: } else {
0578: // If there is no performer for this activity, or the performer
0579: // is of type SYSTEM, we must invoke the implementation now.
0580: String performer = activity.getPerformer();
0581: Participant[] participants = performer == null
0582: || performer.length() == 0 ? null
0583: : WorkflowUtilities.findParticipants(_ctx
0584: .getWorkflow(), performer);
0585: boolean execute = participants == null
0586: || participants.length == 1
0587: && participants[0].getParticipantType() == ParticipantType.SYSTEM;
0588:
0589: if (execute) {
0590: if (implementation instanceof ToolSet) {
0591: completed = executeToolSet();
0592: } else {
0593: // 'No' implementation completes immediately unless it
0594: // is a manual-finish activity.
0595: completed = activity.getFinishMode() != AutomationMode.MANUAL;
0596: }
0597: } else {
0598: // Activities with performers require manual execution (and
0599: // possibly completion), regardless of the start mode.
0600: completed = false;
0601: }
0602: }
0603: } catch (RepositoryException e) {
0604: throw new WMWorkflowException(e);
0605: }
0606: return completed;
0607: }
0608:
0609: // TODO: investigate supporting WfXML for 3rd party subflow enactment.
0610: private boolean executeSubFlow(Activity activity,
0611: ActivityInstance parentActivityInstance, SubFlow subFlow)
0612: throws WMWorkflowException, RepositoryException {
0613:
0614: if (_logger.isDebugEnabled()) {
0615: _logger.debug("Executing sub-flow: " + subFlow.getId());
0616: }
0617:
0618: // Make sure the subflow definition is accessible.
0619: String processDefinitionId = subFlow.getId();
0620: WorkflowProcess workflow = WorkflowUtilities
0621: .findWorkflowProcess(activity.getWorkflowProcess()
0622: .getPackage(), processDefinitionId);
0623:
0624: // TODO: provide a means of naming subflows.
0625: // Instantiate the subflow.
0626: ProcessInstance subflowInstance = _ctx.getEngine()
0627: ._createProcessInstance(processDefinitionId, null,
0628: parentActivityInstance.getActivityInstanceId(),
0629: true);
0630:
0631: // Build the parameter list.
0632: Parameter[] parameters = WorkflowEngineUtilities
0633: .createParameters(workflow.getFormalParameter(),
0634: subFlow.getActualParameter(), _ctx);
0635:
0636: // Set the parameters as subflow instance attributes.
0637: try {
0638: for (int i = 0, n = parameters.length; i < n; i++) {
0639: Parameter parm = parameters[i];
0640: if (parm.getMode() != ParameterMode.OUT) {
0641: subflowInstance.getAttributeInstance(
0642: parm.getFormalParm().getId()).setValue(
0643: Type.DEFAULT_TYPE, parm.getValue());
0644: }
0645: }
0646: } catch (AttributeReadOnlyException e) {
0647: throw new WMWorkflowException(e);
0648: }
0649:
0650: // Start the sub-workflow.
0651: boolean completed;
0652: if (subFlow.getExecution() == ExecutionType.ASYNCHRONOUS) {
0653: if (_logger.isDebugEnabled()) {
0654: _logger.debug("Posted workflow '" + processDefinitionId
0655: + "' instance "
0656: + subflowInstance.getProcessInstanceId()
0657: + " for asynchronous execution.");
0658: }
0659:
0660: // Post the sub-flow for asynchronous execution.
0661: // N.B. Spec. says async return values are not supported.
0662: _svcMgr.getAsyncManager().asyncStartProcess(
0663: subflowInstance.getProcessInstanceId());
0664: completed = true;
0665: } else {
0666: completed = _ctx.getEngine().startProcess(subflowInstance);
0667: if (!completed) {
0668: _logger
0669: .info("Synchronous subflow '"
0670: + processDefinitionId
0671: + "' instance "
0672: + subflowInstance
0673: .getProcessInstanceId()
0674: + " did not complete. Switching to asynchronous execution.");
0675: }
0676: }
0677:
0678: return completed;
0679: }
0680:
0681: private boolean executeToolSet() throws WMWorkflowException {
0682: try {
0683: // If this activity has any non-system performers, we must exit and
0684: // allow the participants to execute their work items. Otherwise, we
0685: // can invoke the tools immediately.
0686: Activity activity = _ctx.getActivity();
0687: String performer = activity.getPerformer();
0688: if (performer != null && performer.length() > 0) {
0689: Participant[] participants = WorkflowUtilities
0690: .findParticipants(_ctx.getWorkflow(), performer);
0691: for (int i = 0; i < participants.length; i++) {
0692: if (participants[i].getParticipantType() != ParticipantType.SYSTEM)
0693: return false;
0694: }
0695: }
0696:
0697: // Prepare and execute the tool invocations.
0698: boolean parallel = activity.getToolMode() == ToolMode.PARALLEL;
0699: ToolSet toolSet = (ToolSet) activity.getImplementation();
0700: Tool[] tools = toolSet.getTool();
0701: for (int i = 0; i < tools.length; i++) {
0702: // On the server we only execute PROCEDURES.
0703: if (tools[i].getToolType() == ToolType.APPLICATION)
0704: continue;
0705:
0706: // If tool mode is PARALLEL, all invocations will be prepared
0707: // regardless of the value of the toolIndex parameter we pass.
0708: ToolInvocation[] invocations = WorkflowEngineUtilities
0709: .prepareToolInvocations(null, i, _ctx);
0710: for (int j = 0; j < invocations.length; j++) {
0711: ToolInvocation ti = invocations[j];
0712: try {
0713: _ctx.setTool(tools[i]);
0714: ti.invokeTool(null, true);
0715: } finally {
0716: _ctx.setTool(null);
0717: }
0718: switch (ti.requestAppStatus()) {
0719: case ToolAgent.FINISHED:
0720: WorkflowEngineUtilities.applyResults(
0721: ti.parameters, _ctx
0722: .getProcessInstance(), false);
0723: break;
0724: case ToolAgent.ERROR:
0725: _logger.error("Error invoking PROCEDURE '"
0726: + ti.toolId + '\'', ti.exception);
0727: throw new WMWorkflowException(ti.exception);
0728: default:
0729: return false;
0730: }
0731: }
0732: if (parallel)
0733: break;
0734: }
0735:
0736: return true;
0737: } catch (AttributeReadOnlyException e) {
0738: throw new WMWorkflowException(e);
0739: } catch (RepositoryException e) {
0740: throw new WMWorkflowException(e);
0741: }
0742: }
0743:
0744: /**
0745: * Execute an afferent transition. The method informs the join of which
0746: * transition fired (if any), in response to which the join itself may fire.
0747: * If the join fires, its target activity is pushed onto the stack for
0748: * execution.
0749: *
0750: * @param transition The afferent transition. Can be <code>null</code>.
0751: * @param activity The activity to execute.
0752: * @param activityInstance The activity instance to execute.
0753: */
0754: private void executeJoin(Transition transition, Activity activity,
0755: ActivityInstance activityInstance)
0756: throws WMWorkflowException {
0757:
0758: if (_logger.isDebugEnabled()) {
0759: _logger.debug("Executing transition '" + transition
0760: + "' to activity '" + activity.getId()
0761: + "' instance "
0762: + activityInstance.getActivityInstanceId());
0763: }
0764:
0765: // If this was triggered by a predecessor activity, inform the join
0766: // that the afferent transition has fired, and check whether this
0767: // results in the join itself firing. If it does not fire, we cannot
0768: // proceed.
0769: JoinInstance joinInstance = activityInstance.getJoin();
0770: if (joinInstance != null) {
0771: if (joinInstance.shouldFire(transition.getId())) {
0772: if (_logger.isDebugEnabled()) {
0773: _logger.debug("Join for activity '"
0774: + activity.getId() + "' instance "
0775: + activityInstance.getActivityInstanceId()
0776: + " fired");
0777: }
0778: } else {
0779: if (_logger.isDebugEnabled()) {
0780: _logger.debug("Join for activity '"
0781: + activity.getId() + "' instance "
0782: + activityInstance.getActivityInstanceId()
0783: + " not fired");
0784: }
0785: return;
0786: }
0787: }
0788:
0789: // Place the activity in the open.notRunning state.
0790: _ctx.getEngine().cascadeActivityInstanceState(
0791: _ctx.getWorkflow(), activity, activityInstance,
0792: WMActivityInstanceState.OPEN_NOTRUNNING_INT, false,
0793: WMObjectState.DEFAULT_INT, false, true);
0794:
0795: // If this is an assigned activity, ensure that work items exist.
0796: if (activity.getPerformer() != null) {
0797: _ctx.getEngine().findCreateWorkItems(_ctx.getWorkflow(),
0798: activity, _ctx.getProcessInstance(),
0799: activityInstance, false);
0800: }
0801:
0802: // If it's an auto-start activity, we can proceed.
0803: if (activity.getStartMode() != AutomationMode.MANUAL) {
0804: if (transition.getExecution() == ExecutionType.ASYNCHRONOUS) {
0805: _svcMgr.getAsyncManager().asyncStartActivity(
0806: activityInstance.getProcessInstanceId(),
0807: activityInstance.getActivityInstanceId());
0808: } else {
0809: startActivity(activity, activityInstance, false);
0810: }
0811: }
0812: }
0813:
0814: // Executes an activity's efferent transitions.
0815: private void executeTransitions(Activity activity,
0816: ActivityInstance activityInstance)
0817: throws WMWorkflowException {
0818:
0819: // Get the first transition restriction.
0820: TransitionRestriction restriction = null;
0821: for (int i = 0, n = activity.getTransitionRestriction().length; i < n
0822: && restriction == null; i++) {
0823:
0824: TransitionRestriction tr = activity
0825: .getTransitionRestriction(i);
0826: if (tr.getSplit() != null)
0827: restriction = tr;
0828: }
0829:
0830: // Determine the split type.
0831: SplitType splitType;
0832: if (restriction == null) {
0833: splitType = SplitType.AND;
0834: } else {
0835: Split split = restriction.getSplit();
0836: if (split == null) {
0837: splitType = SplitType.AND;
0838: } else {
0839: splitType = split.getType();
0840: if (_logger.isDebugEnabled()) {
0841: _logger.debug("Activity '" + activity.getId()
0842: + "' split type: " + splitType);
0843: }
0844: }
0845: }
0846:
0847: try {
0848: Map transitions = activity.getEfferentTransitions();
0849:
0850: // Transitions from XOR splits must be evaluated in the order
0851: // specified by the transition references.
0852: String[] txIds;
0853: if (splitType == SplitType.XOR && restriction != null)
0854: txIds = restriction.getSplit().getTransitionReference();
0855: else {
0856: Set keySet = transitions.keySet();
0857: txIds = (String[]) keySet.toArray(new String[keySet
0858: .size()]);
0859: }
0860:
0861: // Evaluate efferent transitions and activate successor activities.
0862: int firedCount = 0;
0863: Transition otherwise = null;
0864: Transition exception = null;
0865: for (int i = 0, n = txIds.length; i < n; i++) {
0866: Transition transition = (Transition) transitions
0867: .get(txIds[i]);
0868: Condition condition = transition.getCondition();
0869: ConditionType type = condition == null ? null
0870: : condition.getType();
0871: if ((condition == null || type == ConditionType.CONDITION)
0872: && executeTransition(transition,
0873: activityInstance, false)) {
0874:
0875: firedCount++;
0876:
0877: // Only one transition in an XOR split can fire.
0878: if (splitType == SplitType.XOR)
0879: break;
0880: } else if (type == ConditionType.OTHERWISE) {
0881: otherwise = transition;
0882: } else if (type == ConditionType.DEFAULTEXCEPTION
0883: || type == ConditionType.EXCEPTION) {
0884:
0885: exception = transition;
0886: }
0887: }
0888:
0889: // If no transitions fired, take the OTHERWISE path.
0890: if (firedCount == 0) {
0891: if (otherwise == null) {
0892: if (exception == null) {
0893: // The PackageValidator should catch this, shouldn't happen.
0894: _logger
0895: .warn("XPDL error - OTHERWISE transition missing on activity '"
0896: + activity.getId()
0897: + "' in workflow '"
0898: + _ctx.getWorkflow().getId()
0899: + '\'');
0900: }
0901: } else {
0902: executeTransition(otherwise, activityInstance,
0903: false);
0904: }
0905: }
0906: } catch (EvaluatorException e) {
0907: throw new WMWorkflowException(e);
0908: }
0909: }
0910:
0911: boolean executeTransition(Transition transition,
0912: ActivityInstance activityInstance, boolean run)
0913: throws EvaluatorException, WMWorkflowException {
0914:
0915: // A transition will always fire unless there's an associated condition
0916: // that evaluates to false.
0917: Condition condition = transition.getCondition();
0918: if (condition == null
0919: || condition.getType() != ConditionType.CONDITION
0920: || getEvaluator().evaluateCondition(
0921: condition.getValue(), _ctx)) {
0922:
0923: // If this was an event or exception transition, the activity will
0924: // not yet have been closed. For synchronous events or exceptions,
0925: // we must close the activity with an appropriate state: an
0926: // exception terminates it, whereas an event completes it. We have
0927: // to duplicate code that also appears in completeActivity(),
0928: // because async transitions do not cause the activity to close.
0929: WMActivityInstanceState state = WMActivityInstanceState
0930: .valueOf(activityInstance.getState());
0931: if (state.isOpen()) {
0932: boolean sync = true;
0933: Activity activity = transition.getFromActivity();
0934: Deadline[] deadlines = activity.getDeadline();
0935: if (deadlines != null) {
0936: for (int i = 0, n = deadlines.length; i < n; i++) {
0937: Deadline deadline = deadlines[i];
0938: if (deadline.getExceptionName().equals(
0939: _ctx.getEvent().getEventType())) {
0940:
0941: sync = deadline.getExecutionType() == ExecutionType.SYNCHRONOUS;
0942: break;
0943: }
0944: }
0945: }
0946:
0947: // If a synchronous event or exception, close the activity.
0948: if (sync) {
0949: // Unsubscribe the activity from further application events.
0950: try {
0951: _svcMgr
0952: .getApplicationEventBroker()
0953: .unsubscribe(
0954: new String[] {
0955: activityInstance
0956: .getProcessDefinitionId(),
0957: activityInstance
0958: .getProcessInstanceId(),
0959: activityInstance
0960: .getActivityInstanceId() },
0961: false);
0962: } catch (RepositoryException e) {
0963: throw new WMWorkflowException(e);
0964: }
0965:
0966: // For NON_BLOCKED workflows we must reset the join state
0967: // now, because otherwise the join will not fire a second
0968: // time, thus preventing cycles from occurring. In
0969: // LOOP_BLOCKED and FULL_BLOCKED modes the join reset is
0970: // unnecessary because the loop controller automatically
0971: // resets all joins within the block before commencing a new
0972: // iteration.
0973: ConformanceClass conformanceClass = _ctx
0974: .getWorkflow().getPackage()
0975: .getConformanceClass();
0976: GraphConformance graphConformance = conformanceClass == null ? null
0977: : conformanceClass.getGraphConformance();
0978: if (graphConformance == null
0979: || graphConformance == GraphConformance.NON_BLOCKED) {
0980:
0981: JoinInstance join = activityInstance.getJoin();
0982: if (join != null)
0983: join.reset();
0984: }
0985:
0986: // Update the activity & work item states.
0987: int activityState;
0988: int workItemState;
0989: if (condition != null
0990: && (condition.getType() == ConditionType.EXCEPTION || condition
0991: .getType() == ConditionType.DEFAULTEXCEPTION)) {
0992:
0993: activityState = WMActivityInstanceState.CLOSED_TERMINATED_INT;
0994: workItemState = WMWorkItemState.CLOSED_TERMINATED_INT;
0995: } else {
0996: activityState = WMActivityInstanceState.CLOSED_COMPLETED_INT;
0997: workItemState = WMWorkItemState.CLOSED_COMPLETED_INT;
0998: }
0999: // Activity is now closed.
1000: activityInstance.setCompletedDate(new Date());
1001:
1002: _ctx.getEngine().cascadeActivityInstanceState(
1003: _ctx.getWorkflow(), _ctx.getActivity(),
1004: activityInstance, activityState, true,
1005: workItemState, false, false);
1006:
1007: // Make sure the process instance activity dates match.
1008: _ctx.getEngine().recomputeDates(
1009: activityInstance.getProcessInstance(),
1010: activityInstance);
1011: }
1012: }
1013:
1014: if (_logger.isDebugEnabled()) {
1015: _logger.debug("Transition '"
1016: + transition.getId()
1017: + "' fired in process instance "
1018: + _ctx.getProcessInstance()
1019: .getProcessInstanceId());
1020: }
1021:
1022: // Find or create the successor activity instance.
1023: Activity toActivity = transition.getToActivity();
1024: ActivityInstance toActivityInstance = findCreateActivityInstance(
1025: toActivity, activityInstance
1026: .getBlockActivityInstanceId(), true);
1027:
1028: // Notify transition listeners. N.B. The TransitionFired event
1029: // cannot occur before the ActivityInstanceCreated event, because
1030: // the former requires the 'to' ActivityInstance.
1031: _svcMgr.getWorkflowEventBroker().fireTransitionFired(
1032: toActivityInstance, transition);
1033:
1034: // Fire the join's afferent transition.
1035: executeJoin(transition, toActivity, toActivityInstance);
1036:
1037: // If necessary, run the activities.
1038: if (run)
1039: run();
1040:
1041: return true;
1042: } else {
1043: if (_logger.isDebugEnabled()) {
1044: _logger.debug("Transition '"
1045: + transition.getId()
1046: + "' "
1047: + transition.getCondition()
1048: + " evaluated to false in process instance "
1049: + _ctx.getProcessInstance()
1050: .getProcessInstanceId());
1051: }
1052: return false;
1053: }
1054: }
1055:
1056: // Checks whether a block activity is complete.
1057: private boolean isBlockActivityComplete(Activity activity,
1058: ActivityInstance activityInstance)
1059: throws WMWorkflowException {
1060:
1061: boolean completed;
1062: BlockActivity outerBlkActivity = activity.getBlockActivity();
1063: int loopType = LoopBody.NONE;
1064: Loop loop = outerBlkActivity.getLoop();
1065: LoopBody body = null;
1066: if (loop != null) {
1067: body = loop.getBody();
1068: loopType = body.getType();
1069: }
1070: switch (loopType) {
1071: case LoopBody.NONE:
1072: // No loop, we're done.
1073: completed = true;
1074: break;
1075: case LoopBody.FOR_EACH:
1076: completed = !activityInstance.getBlockActivityIterator()
1077: .hasNext();
1078: break;
1079: case LoopBody.UNTIL:
1080: completed = !continueUntil((Until) body);
1081: break;
1082: case LoopBody.WHILE:
1083: completed = !continueWhile((While) body);
1084: break;
1085: default:
1086: throw new OBERuntimeException(
1087: "Programming error: invalid loop type");
1088: }
1089: return completed;
1090: }
1091:
1092: private void pushActivity(Activity activity,
1093: ActivityInstance instance) {
1094: _activityStack.push(new ActivityContext(activity, instance));
1095: }
1096:
1097: // Ensures that all start activities in the list have instances. Resets
1098: // activity instance started & completed dates, and state. Posts any
1099: // start activities (i.e., those with no predecessors) for execution.
1100: private void pushStartActivities(Activity[] activities,
1101: String blockActivityInstanceId) throws WMWorkflowException {
1102:
1103: // XPDL 1.0 permits both null and empty Activities elements
1104: if (activities == null || activities.length == 0)
1105: return;
1106:
1107: // Values we'll need repeatedly within the loop.
1108: boolean inBlockActivity = blockActivityInstanceId != null;
1109: WorkflowEngine engine = _ctx.getEngine();
1110: WorkflowProcess workflow = _ctx.getWorkflow();
1111: ProcessInstance processInstance = _ctx.getProcessInstance();
1112:
1113: // First reset all the activity instance entities, and instantiate
1114: // start activities if necessary.
1115: for (int i = 0, n = activities.length; i < n; i++) {
1116: Activity activity = activities[i];
1117:
1118: // If this is a start activity whose instance doesn't exist yet,
1119: // that's okay - we'll create it.
1120: ActivityInstance activityInstance = findCreateActivityInstance(
1121: activity, blockActivityInstanceId, activity
1122: .isStartActivity());
1123:
1124: // If we're re-starting a block activity we must reset the dates,
1125: // states, joins and iterators for all existing activity instances
1126: // within the block. All work items are forced to open.notRunning.
1127: // TODO: eliminate unnecessary update of new activity instance.
1128: if (inBlockActivity && activityInstance != null) {
1129: activityInstance.setStartedDate(null);
1130: activityInstance.setTargetDate(null);
1131: activityInstance.setDueDate(null);
1132: activityInstance.setCompletedDate(null);
1133:
1134: // Make sure the process instance activity dates match.
1135: engine
1136: .recomputeDates(processInstance,
1137: activityInstance);
1138:
1139: engine.cascadeActivityInstanceState(workflow, activity,
1140: activityInstance,
1141: WMActivityInstanceState.OPEN_NOTRUNNING_INT,
1142: true, WMWorkItemState.OPEN_NOTRUNNING_INT,
1143: false, true);
1144:
1145: JoinInstance join = activityInstance.getJoin();
1146: if (join != null)
1147: join.reset();
1148:
1149: PersistentIterator blockActivityIterator = activityInstance
1150: .getBlockActivityIterator();
1151: if (blockActivityIterator != null)
1152: blockActivityIterator.reset();
1153: }
1154:
1155: // If an automatic mode start activity.
1156: if (activity.isStartActivity()) {
1157: // If an assigned activity, ensure that a work item exists.
1158: String performer = activity.getPerformer();
1159: if (performer != null && performer.length() > 0) {
1160: engine.findCreateWorkItems(workflow, activity,
1161: processInstance, activityInstance, false);
1162: }
1163:
1164: // If auto-start, enqueue for immediate execution.
1165: if (activity.getStartMode() != AutomationMode.MANUAL)
1166: startActivity(activity, activityInstance, false);
1167: }
1168: }
1169: }
1170:
1171: // This is the main workflow execution loop.
1172: private void run() throws WMWorkflowException {
1173: if (!_running) {
1174: _running = true;
1175: while (!_activityStack.isEmpty()) {
1176: ActivityContext ap = (ActivityContext) _activityStack
1177: .pop();
1178: _ctx.setActivityContext(ap.activity, ap.instance);
1179: try {
1180: executeActivityInstance(ap.activity, ap.instance);
1181: } catch (WMWorkflowException e) {
1182: handleException(e);
1183: }
1184: }
1185: _running = false;
1186: }
1187: }
1188:
1189: private void handleException(WMWorkflowException e)
1190: throws WMWorkflowException {
1191:
1192: if (!_running)
1193: throw e;
1194:
1195: String exceptionName = e instanceof WMExecuteException ? ((WMExecuteException) e)
1196: .getExceptionName()
1197: : e.getClass().getName();
1198:
1199: if (_logger.isDebugEnabled())
1200: _logger.debug("handleException(" + exceptionName + ')');
1201:
1202: // Search for a matching exception handler transition.
1203: Transition defaultException = null;
1204: Iterator iter = _ctx.getActivity().getEfferentTransitions()
1205: .values().iterator();
1206: while (iter.hasNext()) {
1207: Transition transition = (Transition) iter.next();
1208: Condition condition = transition.getCondition();
1209: if (condition != null) {
1210: switch (condition.getType().value()) {
1211: case ConditionType.EXCEPTION_INT:
1212: // Fire the first matching exception handler transition.
1213: boolean match = false;
1214: String value = condition.getValue();
1215: if (value.indexOf('.') == -1) {
1216: // value is a custom XPDL exception name.
1217: if (exceptionName.equals(value))
1218: match = true;
1219: } else {
1220: // value is a fully qualified exception class name.
1221: // See whether it matches the class of the caught
1222: // exception or that of any nested exception.
1223: try {
1224: Class clazz = Class.forName(value);
1225: Throwable oe, ie = e;
1226: do {
1227: if (clazz.isAssignableFrom(ie
1228: .getClass())) {
1229: match = true;
1230: break;
1231: }
1232: oe = ie;
1233: ie = ie.getCause();
1234: } while (ie != null && ie != oe);
1235: } catch (ClassNotFoundException cnfe) {
1236: // If we can't even load this class it certainly
1237: // can't be a match for the caught exception!
1238: continue;
1239: }
1240: }
1241: if (match) {
1242: if (_logger.isDebugEnabled()) {
1243: _logger.debug("Exception '" + exceptionName
1244: + "' triggered transition "
1245: + transition.getId());
1246: }
1247: try {
1248: executeTransition(transition, _ctx
1249: .getActivityInstance(), false);
1250: } catch (EvaluatorException ee) {
1251: throw new WMWorkflowException(ee);
1252: }
1253: return;
1254: }
1255: break;
1256: case ConditionType.DEFAULTEXCEPTION_INT:
1257: if (defaultException == null) {
1258: defaultException = transition;
1259: } else {
1260: _logger
1261: .warn("Duplicate defaultException transition '"
1262: + transition.getId()
1263: + "' ignored.");
1264: }
1265: break;
1266: }
1267: }
1268: }
1269:
1270: // No default handler - just rethrow the original exception.
1271: if (defaultException == null)
1272: throw e;
1273:
1274: try {
1275: // Otherwise, fire the default exception handler.
1276: if (_logger.isDebugEnabled()) {
1277: _logger.debug("Exception '" + exceptionName
1278: + "' triggered transition "
1279: + defaultException.getId());
1280: }
1281: executeTransition(defaultException, _ctx
1282: .getActivityInstance(), false);
1283: } catch (EvaluatorException ee) {
1284: throw new WMWorkflowException(ee);
1285: }
1286: }
1287:
1288: void startActivity(Activity activity,
1289: ActivityInstance activityInstance, boolean run)
1290: throws WMWorkflowException {
1291:
1292: // If necessary, reset the started, completed, target and due dates. In
1293: // LOOP_BLOCKED and FULL_BLOCKED graph conformance modes this is done
1294: // by the encompassing BlockActivity, in NON_BLOCKED mode we have to
1295: // do it whenever we re-enter an activity.
1296: ConformanceClass conformanceClass = _ctx.getWorkflow()
1297: .getPackage().getConformanceClass();
1298: GraphConformance graphConformance = conformanceClass == null ? null
1299: : conformanceClass.getGraphConformance();
1300: if (activityInstance.getStartedDate() == null
1301: || graphConformance == null
1302: || graphConformance == GraphConformance.NON_BLOCKED) {
1303:
1304: // The activity is now officially started - set started date.
1305: Date startedDate = new Date();
1306: Date targetDate;
1307: Date dueDate;
1308: CalendarFactory calendarFactory = _svcMgr
1309: .getCalendarFactory();
1310: try {
1311: // If the activity has a duration, calculate the target date.
1312: targetDate = WorkflowEngineUtilities
1313: .calculateTargetDate(activity, null, null,
1314: startedDate, calendarFactory);
1315: // If the activity has a limit, calculate the due date.
1316: dueDate = WorkflowEngineUtilities.calculateDueDate(
1317: activity, null, null, startedDate,
1318: calendarFactory);
1319: } catch (RepositoryException e) {
1320: throw new WMWorkflowException(e);
1321: }
1322: activityInstance.setStartedDate(startedDate);
1323: activityInstance.setTargetDate(targetDate);
1324: activityInstance.setDueDate(dueDate);
1325: activityInstance.setCompletedDate(null);
1326:
1327: // Make sure the process instance activity dates match.
1328: _ctx.getEngine().recomputeDates(_ctx.getProcessInstance(),
1329: activityInstance);
1330:
1331: // TODO: Think about event propagation to block activity/process level.
1332: ApplicationEventBroker broker = _svcMgr
1333: .getApplicationEventBroker();
1334: String activityInstanceId = activityInstance
1335: .getActivityInstanceId();
1336:
1337: // Check whether this activity has any event- or exception-type
1338: // efferent transitions and if so, subscribe to these events.
1339: Deadline[] deadlines = activity.getDeadline();
1340: Collection transitions = activity.getEfferentTransitions()
1341: .values();
1342: for (Iterator iter = transitions.iterator(); iter.hasNext();) {
1343: Transition transition = (Transition) iter.next();
1344: Condition condition = transition.getCondition();
1345: Event event = transition.getEvent();
1346: if (event != null) {
1347: // Find the event definition in the workflow or package.
1348: String eventType = event.getId();
1349: EventTypeMetaData metaData;
1350: try {
1351: metaData = _svcMgr.getApplicationEventBroker()
1352: .findEventTypeMetaData(eventType);
1353: } catch (ObjectNotFoundException e) {
1354: throw new WMInvalidProcessDefinitionException(
1355: "Invalid event type: " + eventType);
1356: } catch (RepositoryException e) {
1357: throw new WMWorkflowException(e);
1358: }
1359:
1360: // Subscribe to this event occurrence.
1361: try {
1362: ActualParameter[] actualParms = event
1363: .getActualParameter();
1364: String[] correlationKeys = new String[] {
1365: activity.getWorkflowProcess().getId(),
1366: activityInstance.getProcessInstanceId(),
1367: activityInstanceId, transition.getId() };
1368: if (broker.supportsKeyBasedSubscriptions()) {
1369: Object[] eventKeys = WorkflowEngineUtilities
1370: .createEventKeys(metaData,
1371: actualParms, _ctx);
1372: broker.subscribe(eventType, eventKeys,
1373: event.getPredicate(), null, null,
1374: event.getCount(), correlationKeys,
1375: _ctx);
1376: } else {
1377: // Alternative approach - synthesize an XPath
1378: // expression containing the key values.
1379: String predicate = WorkflowEngineUtilities
1380: .createEventCondition(metaData,
1381: actualParms, _ctx);
1382:
1383: broker.subscribe(eventType, null,
1384: predicate, null, null, event
1385: .getCount(),
1386: correlationKeys, _ctx);
1387: }
1388: } catch (RepositoryException e) {
1389: throw new WMWorkflowException(e);
1390: }
1391: } else if (condition != null
1392: && condition.getType() == ConditionType.EXCEPTION
1393: && deadlines.length > 0) {
1394:
1395: // If there's a transition condition, check whether it's an
1396: // exception-type transition.
1397: String exceptionName = condition.getValue();
1398: for (int j = 0; j < deadlines.length; j++) {
1399: Deadline deadline = deadlines[j];
1400: if (exceptionName.equals(deadline
1401: .getExceptionName())) {
1402: String[] correlationKeys = new String[] {
1403: activityInstance
1404: .getProcessDefinitionId(),
1405: activityInstance
1406: .getProcessInstanceId(),
1407: activityInstanceId,
1408: transition.getId() };
1409:
1410: // TODO: Subscribe to earliest deadline only.
1411: // (If deadlines are all synchronous.)
1412: // Subscribe to this exception occurrence.
1413: try {
1414: Date deadlineDate = WorkflowEngineUtilities
1415: .calculateDeadlineDate(
1416: activity, deadline,
1417: startedDate,
1418: calendarFactory);
1419: broker.subscribe(exceptionName,
1420: deadlineDate, correlationKeys);
1421: } catch (RepositoryException e) {
1422: throw new WMWorkflowException(e);
1423: }
1424: break;
1425: }
1426: }
1427: }
1428: }
1429: }
1430:
1431: // Cascade state update to activity instance and work items. If the
1432: // activity is auto-start, start the work items as well. If it is
1433: // manual start so are its work items; we must wait for the participants
1434: // to start their work items manually. Also notifies listeners.
1435: int workItemState = activity.getStartMode() == AutomationMode.MANUAL ? WMObjectState.DEFAULT_INT
1436: : WMWorkItemState.OPEN_RUNNING_INT;
1437: _ctx.getEngine().cascadeActivityInstanceState(
1438: _ctx.getWorkflow(), activity, activityInstance,
1439: WMActivityInstanceState.OPEN_RUNNING_INT, true,
1440: workItemState, false, true);
1441:
1442: // Push the activity for immediate execution.
1443: pushActivity(activity, activityInstance);
1444:
1445: // Make sure the activities get executed.
1446: if (run)
1447: run();
1448: }
1449:
1450: void continueActivity(Activity activity,
1451: ActivityInstance activityInstance)
1452: throws WMWorkflowException {
1453:
1454: // Push the activity for immediate execution.
1455: pushActivity(activity, activityInstance);
1456: run();
1457: }
1458:
1459: boolean startProcess() throws WMWorkflowException {
1460: ProcessInstance processInstance = _ctx.getProcessInstance();
1461:
1462: if (_logger.isDebugEnabled()) {
1463: _logger.debug("Starting process '"
1464: + processInstance.getProcessDefinitionId()
1465: + "' instance "
1466: + processInstance.getProcessInstanceId());
1467: }
1468:
1469: // Validate that all input parameters have been set.
1470: WorkflowProcess workflow = _ctx.getWorkflow();
1471: FormalParameter[] parms = workflow.getFormalParameter();
1472: if (parms != null) {
1473: Map attrs;
1474: try {
1475: attrs = processInstance.getAttributeInstances();
1476: } catch (RepositoryException e) {
1477: throw new WMWorkflowException(e);
1478: }
1479: for (int i = 0, n = parms.length; i < n; i++) {
1480: FormalParameter fp = parms[i];
1481: String attrName = fp.getId();
1482: AttributeInstance attr = (AttributeInstance) attrs
1483: .get(attrName);
1484: if (attr == null)
1485: throw new WMInvalidAttributeException(attrName);
1486:
1487: if (fp.getMode() != ParameterMode.OUT
1488: && attr.getValue() == null) {
1489:
1490: throw new WMInvalidProcessInstanceException(
1491: "Input parameter '"
1492: + attrName
1493: + "' not set in process instance: "
1494: + _ctx.getProcessInstance()
1495: .getProcessInstanceId());
1496: }
1497: }
1498: }
1499:
1500: Date startedDate = new Date();
1501: Date targetDate;
1502: Date dueDate;
1503: try {
1504: // If the workflow has an estimated duration, calculate the target date.
1505: CalendarFactory calendarFactory = _svcMgr
1506: .getCalendarFactory();
1507: targetDate = WorkflowEngineUtilities.calculateTargetDate(
1508: workflow, startedDate, calendarFactory);
1509: // If the workflow has a limit, calculate the due date.
1510: dueDate = WorkflowEngineUtilities.calculateDueDate(
1511: workflow, startedDate, calendarFactory);
1512: } catch (RepositoryException e) {
1513: throw new WMWorkflowException(e);
1514: }
1515: processInstance.setStartedDate(startedDate);
1516: processInstance.setTargetDate(targetDate);
1517: processInstance.setDueDate(dueDate);
1518:
1519: // Set the process instance status to open.running (also notifies
1520: // process instance listeners).
1521: _ctx.getEngine().cascadeProcessInstanceState(workflow,
1522: processInstance,
1523: WMProcessInstanceState.OPEN_RUNNING_INT, true,
1524: WMObjectState.DEFAULT_INT, false,
1525: WMObjectState.DEFAULT_INT, false, false);
1526:
1527: // Create and post the start activities.
1528: pushStartActivities(workflow.getActivity(), null);
1529:
1530: // Make sure the activities get executed.
1531: run();
1532:
1533: return processInstance.getState() == WMProcessInstanceState.CLOSED_COMPLETED_INT;
1534: }
1535:
1536: // Finds an activity instance by its instance ID.
1537: private ActivityInstance findActivityInstance(
1538: String activityInstanceId)
1539: throws WMInvalidActivityInstanceException {
1540:
1541: try {
1542: InstanceRepository instanceRepository = _svcMgr
1543: .getInstanceRepository();
1544: return instanceRepository
1545: .findActivityInstance(activityInstanceId);
1546: } catch (RepositoryException e) {
1547: throw new WMInvalidActivityInstanceException(
1548: activityInstanceId);
1549: }
1550: }
1551:
1552: // Finds an activity instance by its definition ID, optionally creating it.
1553: private ActivityInstance findCreateActivityInstance(
1554: Activity activity, String blockActivityInstanceId,
1555: boolean create) throws WMWorkflowException {
1556:
1557: ActivityInstance activityInstance = null;
1558: String activityDefinitionId = activity.getId();
1559: for (Iterator iter = _ctx.getProcessInstance()
1560: .getActivityInstances().iterator(); iter.hasNext();) {
1561:
1562: ActivityInstance ai = (ActivityInstance) iter.next();
1563: String baiId = ai.getBlockActivityInstanceId();
1564: if (ai.getActivityDefinitionId().equals(
1565: activityDefinitionId)
1566: && (baiId == null
1567: && blockActivityInstanceId == null || baiId != null
1568: && baiId.equals(blockActivityInstanceId))) {
1569:
1570: activityInstance = ai;
1571: break;
1572: }
1573: }
1574: // If this is a start activity whose instance doesn't exist yet,
1575: // that's okay - we'll create it. We'll also create it if we're
1576: // executing an ActivitySet, because now is the only chance we
1577: // get to link the sub-activity instance to its block activity
1578: // instance.
1579: if (activityInstance == null && create) {
1580: activityInstance = createActivityInstance(activity,
1581: blockActivityInstanceId);
1582: }
1583: return activityInstance;
1584: }
1585:
1586: private ActivityInstance createActivityInstance(Activity activity,
1587: String blockActivityInstanceId) throws WMWorkflowException {
1588:
1589: InstanceRepository instanceRepository = _svcMgr
1590: .getInstanceRepository();
1591:
1592: // If there are one or more afferent transitions then create a join
1593: // instance to control activation of the activity instance.
1594: ActivityInstance activityInstance;
1595: JoinInstance joinInstance = null;
1596: Map trs = activity.getAfferentTransitions();
1597: if (!trs.isEmpty()) {
1598: String[] trIds = new String[trs.size()];
1599: int j = 0;
1600: for (Iterator iter = trs.keySet().iterator(); iter
1601: .hasNext(); j++)
1602: trIds[j] = (String) iter.next();
1603: if (trs.size() == 1) {
1604: // Even activities with a single afferent transition require
1605: // a join instance, so that we can tell when they're ready to
1606: // start (since the XPDL spec. doesn't define a ready state
1607: // between open.notRunning and open.running).
1608: joinInstance = new OrJoinInstance(trIds);
1609: } else if (trs.size() > 1) {
1610: for (int k = 0, n = activity.getTransitionRestriction().length; k < n
1611: && joinInstance == null; k++) {
1612:
1613: Join join = activity.getTransitionRestriction(k)
1614: .getJoin();
1615: if (join != null) {
1616: switch (join.getType().value()) {
1617: case JoinType.AND_INT:
1618: joinInstance = new AndJoinInstance(trIds);
1619: break;
1620: case JoinType.XOR_INT:
1621: joinInstance = new OrJoinInstance(trIds);
1622: break;
1623: }
1624: }
1625: }
1626: }
1627: }
1628:
1629: // If this is a looping block activity, we must store a loop iterator.
1630: PersistentIterator blockActivityIterator = null;
1631: BlockActivity blockActivity = activity.getBlockActivity();
1632: if (blockActivity != null && blockActivity.getLoop() != null)
1633: blockActivityIterator = new PersistentArrayIterator();
1634:
1635: // Calculate the activity priority.
1636: int priority = WorkflowUtilities.findActivityPriority(activity);
1637:
1638: try {
1639: // Create the activity instance.
1640: activityInstance = instanceRepository
1641: .createActivityInstance(
1642: _ctx.getWorkflow().getId(),
1643: _ctx.getProcessInstance()
1644: .getProcessInstanceId(),
1645: activity.getId(),
1646: activity.getName(),
1647: joinInstance,
1648: blockActivityInstanceId,
1649: blockActivityIterator,
1650: priority,
1651: WMActivityInstanceState.OPEN_NOTRUNNING_INT,
1652: EMPTY_PARTICPANTS);
1653:
1654: // Notify activity instance listeners.
1655: _svcMgr.getWorkflowEventBroker()
1656: .fireActivityInstanceCreated(activityInstance,
1657: activity);
1658: } catch (RepositoryException e) {
1659: throw new WMWorkflowException(e);
1660: }
1661: return activityInstance;
1662: }
1663:
1664: private Evaluator getEvaluator() throws WMWorkflowException {
1665: if (_evaluator == null) {
1666: try {
1667: _evaluator = WorkflowEngineUtilities.getEvaluator(_ctx
1668: .getWorkflow().getPackage(), _svcMgr);
1669: } catch (RepositoryException e) {
1670: throw new WMWorkflowException(e);
1671: }
1672: }
1673: return _evaluator;
1674: }
1675: }
|