001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.osworkflow;
018:
019: import java.util.Map;
020:
021: import javax.jbi.messaging.InOnly;
022: import javax.jbi.messaging.MessageExchange;
023: import javax.jbi.messaging.RobustInOnly;
024:
025: import com.opensymphony.workflow.InvalidInputException;
026: import com.opensymphony.workflow.InvalidRoleException;
027: import com.opensymphony.workflow.Workflow;
028: import com.opensymphony.workflow.WorkflowException;
029: import com.opensymphony.workflow.basic.BasicWorkflow;
030: import com.opensymphony.workflow.config.DefaultConfiguration;
031:
032: import org.apache.commons.logging.Log;
033: import org.apache.commons.logging.LogFactory;
034:
035: /**
036: * @author lhe
037: */
038: public class OSWorkflow extends Thread {
039:
040: public static final String KEY_EXCHANGE = "exchange";
041:
042: public static final String KEY_IN_MESSAGE = "in-message";
043:
044: public static final String KEY_ENDPOINT = "endpoint";
045:
046: public static final String KEY_CALLER = "caller";
047:
048: public static final String KEY_ASYNC_PROCESSING = "asynchronous";
049:
050: private static Log log = LogFactory.getLog(OSWorkflow.class);
051:
052: private Workflow osWorkflowInstance;
053:
054: private String caller;
055:
056: private String osWorkflowName;
057:
058: private Map map;
059:
060: private int action = -1;
061:
062: private long workflowId = -1L;
063:
064: private boolean finished;
065:
066: private boolean aborted;
067:
068: private OSWorkflowEndpoint endpoint;
069:
070: private MessageExchange exchange;
071:
072: /**
073: * creates and initializes a new workflow object
074: *
075: * @param ep
076: * the endpoint reference
077: * @param workflowName
078: * the unique workflow name as defined in workflows.xml
079: * @param action
080: * the initial action
081: * @param map
082: * the value map
083: * @param caller
084: * the caller
085: * @param exchange
086: * the received message exchange
087: */
088: public OSWorkflow(OSWorkflowEndpoint ep, String workflowName,
089: int action, Map map, String caller, MessageExchange exchange) {
090: super (workflowName);
091: setDaemon(true);
092:
093: this .endpoint = ep; // remember the endpoint which called the osworkflow
094: this .osWorkflowName = workflowName;
095: this .osWorkflowInstance = null;
096: this .action = action;
097: this .map = map;
098: this .caller = caller;
099: this .exchange = exchange;
100:
101: // now fill the transient vars with some useful objects
102: this .map.put(KEY_ENDPOINT, this .endpoint);
103: this .map.put(KEY_CALLER, this .caller);
104: this .map.put(KEY_IN_MESSAGE, this .exchange.getMessage("in"));
105: this .map.put(KEY_EXCHANGE, this .exchange);
106: this .map.put(KEY_ASYNC_PROCESSING,
107: this .exchange instanceof InOnly
108: || this .exchange instanceof RobustInOnly);
109: }
110:
111: /**
112: * initializes the workflow and a default config
113: *
114: * @return the unique workflow id
115: */
116: private long createWorkflow() throws InvalidRoleException,
117: InvalidInputException, WorkflowException {
118: this .osWorkflowInstance = new BasicWorkflow(this .caller);
119: DefaultConfiguration config = new DefaultConfiguration();
120: this .osWorkflowInstance.setConfiguration(config);
121: long wfId = this .osWorkflowInstance.initialize(
122: this .osWorkflowName, this .action, this .map);
123: return wfId;
124: }
125:
126: /*
127: * (non-Javadoc)
128: *
129: * @see java.lang.Thread#run()
130: */
131: @Override
132: public void run() {
133: // call the endpoint method for init actions
134: this .endpoint.preWorkflow();
135:
136: log.debug("Starting workflow...");
137: log.debug("Name: " + this .osWorkflowName);
138: log.debug("Action: " + this .action);
139: log.debug("Caller: " + this .caller);
140: log.debug("Map: " + this .map);
141:
142: // loop as long as there are more actions to do and the workflow is not
143: // finished or aborted
144: while (!finished && !aborted) {
145: // initial creation
146: if (this .osWorkflowInstance == null) {
147: try {
148: this .workflowId = createWorkflow();
149: } catch (Exception ex) {
150: log.error("Error creating the workflow", ex);
151: aborted = true;
152: break;
153: }
154: }
155:
156: // determine the available actions
157: int[] availableActions = this .osWorkflowInstance
158: .getAvailableActions(this .workflowId, this .map);
159:
160: // check if there are more actions available
161: if (availableActions.length == 0) {
162: // no, no more actions available - workflow finished
163: log.debug("No more actions. Workflow is finished...");
164: this .finished = true;
165: } else {
166: // get first available action to execute
167: int nextAction = availableActions[0];
168:
169: log.debug("call action " + nextAction);
170: try {
171: // call the action
172: this .osWorkflowInstance.doAction(this .workflowId,
173: nextAction, this .map);
174: } catch (InvalidInputException iiex) {
175: log.error(iiex);
176: aborted = true;
177: } catch (WorkflowException wfex) {
178: log.error(wfex);
179: aborted = true;
180: }
181: }
182: }
183:
184: log.debug("Stopping workflow...");
185: log.debug("Name: " + this .osWorkflowName);
186: log.debug("Action: " + this .action);
187: log.debug("Caller: " + this .caller);
188: log.debug("Map: " + this .map);
189: log.debug("WorkflowId: " + this .workflowId);
190: log.debug("End state: " + (finished ? "Finished" : "Aborted"));
191:
192: // call the endpoint method for cleanup actions or message exchange
193: this.endpoint.postWorkflow();
194: }
195: }
|