001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi.component;
018:
019: import java.io.IOException;
020: import java.util.HashMap;
021: import java.util.Map;
022: import java.util.Stack;
023:
024: import javax.jbi.component.ComponentContext;
025: import javax.jbi.management.DeploymentException;
026: import javax.jbi.messaging.DeliveryChannel;
027: import javax.jbi.messaging.ExchangeStatus;
028: import javax.jbi.messaging.InOptionalOut;
029: import javax.jbi.messaging.InOut;
030: import javax.jbi.messaging.MessageExchange;
031: import javax.jbi.messaging.MessageExchangeFactory;
032: import javax.jbi.messaging.MessagingException;
033: import javax.jbi.messaging.NormalizedMessage;
034: import javax.jbi.messaging.MessageExchange.Role;
035: import javax.jbi.servicedesc.ServiceEndpoint;
036:
037: import org.apache.commons.logging.Log;
038: import org.apache.servicemix.common.Endpoint;
039: import org.apache.servicemix.common.ExchangeProcessor;
040: import org.bpmscript.BpmScriptException;
041: import org.bpmscript.ExecutorResult;
042: import org.bpmscript.IProcessExecutor;
043: import org.bpmscript.IProcessInstance;
044: import org.bpmscript.IProcessInstanceCallback;
045: import org.bpmscript.IProcessInstanceManager;
046: import org.bpmscript.IProcessManager;
047: import org.bpmscript.IgnoredResult;
048: import org.bpmscript.ProcessState;
049:
050: /**
051: * @org.apache.xbean.XBean element="endpoint"
052: */
053: public class ProcessEndpoint extends Endpoint implements
054: ExchangeProcessor {
055:
056: private ServiceEndpoint activated;
057: private DeliveryChannel channel;
058: private MessageExchangeFactory exchangeFactory;
059: private IProcessInstanceManager processInstanceManager;
060: private IProcessExecutor processExecutor;
061: private String processId;
062: private String sourceResource;
063: private String defaultOperation = "main";
064: private Log log;
065: private Map<String, Object> invocationContext = new HashMap<String, Object>();
066:
067: /* (non-Javadoc)
068: * @see org.apache.servicemix.common.Endpoint#getRole()
069: */
070: public Role getRole() {
071: return Role.PROVIDER;
072: }
073:
074: public void activate() throws Exception {
075: logger = this .serviceUnit.getComponent().getLogger();
076: ComponentContext ctx = getServiceUnit().getComponent()
077: .getComponentContext();
078: channel = new EndpointDeliveryChannel(ctx.getDeliveryChannel(),
079: this );
080: exchangeFactory = channel.createExchangeFactory();
081: activated = ctx.activateEndpoint(service, endpoint);
082: log = getServiceUnit().getComponent().getLogger();
083: invocationContext.put("deliveryChannel", channel);
084: invocationContext.put("componentContext", ctx);
085: invocationContext.put("service", getService());
086: start();
087: }
088:
089: public void deactivate() throws Exception {
090: stop();
091: ServiceEndpoint ep = activated;
092: activated = null;
093: ComponentContext ctx = getServiceUnit().getComponent()
094: .getComponentContext();
095: ctx.deactivateEndpoint(ep);
096: }
097:
098: public ExchangeProcessor getProcessor() {
099: return this ;
100: }
101:
102: public void validate() throws DeploymentException {
103: }
104:
105: protected void send(MessageExchange me) throws MessagingException {
106: channel.send(me);
107: }
108:
109: protected void done(MessageExchange me) throws MessagingException {
110: me.setStatus(ExchangeStatus.DONE);
111: send(me);
112: }
113:
114: protected void fail(MessageExchange me, Exception error)
115: throws MessagingException {
116: me.setError(error);
117: send(me);
118: }
119:
120: public void start() throws Exception {
121: }
122:
123: public void stop() {
124: }
125:
126: public void process(MessageExchange exchange) throws Exception {
127: // The component acts as a provider, this means that another component has requested our service
128: // As this exchange is active, this is either an in or a fault (out are send by this component)
129: if (exchange.getStatus() == ExchangeStatus.DONE) {
130: // ignore done messages
131: return;
132: }
133: Stack processInstanceIdStack = (Stack) exchange
134: .getProperty("processInstanceIdStack");
135: if (processInstanceIdStack == null) {
136: processInstanceIdStack = new Stack();
137: }
138: if (exchange instanceof InOut
139: || exchange instanceof InOptionalOut) {
140: if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
141: // Check here if the mep is supported by this component
142: if (exchange instanceof InOut == false) {
143: throw new UnsupportedOperationException(
144: "Unsupported MEP: " + exchange.getPattern());
145: }
146: // In message
147: if (exchange.getMessage("in") != null) {
148: NormalizedMessage in = exchange.getMessage("in");
149:
150: String processInstanceId = (String) in
151: .getProperty("processInstanceId");
152: if (processInstanceId != null) {
153: // calling existing process instance
154: processInstanceIdStack.push(processInstanceId);
155: log.debug("returning to existing process "
156: + processInstanceId);
157: try {
158: sendMessage(exchange,
159: processInstanceIdStack,
160: processInstanceId);
161: } catch (Throwable e) {
162: log.error(e, e);
163: fail(
164: exchange,
165: e instanceof Exception ? (Exception) e
166: : new RuntimeException(e));
167: }
168: } else {
169: String operation = defaultOperation;
170: String specificOperation = (String) in
171: .getProperty("operation");
172: if (specificOperation != null) {
173: operation = specificOperation;
174: }
175: log.debug("creating new process " + endpoint
176: + " and operation " + operation);
177: String newProcessInstanceId = processInstanceManager
178: .createProcessInstance(
179: processInstanceIdStack.size() > 0 ? (String) processInstanceIdStack
180: .peek()
181: : null, endpoint,
182: operation);
183: log
184: .debug("created new process with instance id "
185: + newProcessInstanceId);
186: processInstanceIdStack = new Stack();
187: processInstanceIdStack
188: .push(newProcessInstanceId);
189: try {
190: sendMessage(exchange,
191: processInstanceIdStack,
192: newProcessInstanceId);
193: } catch (Throwable e) {
194: log.error(e, e);
195: fail(
196: exchange,
197: e instanceof Exception ? (Exception) e
198: : new RuntimeException(e));
199: }
200: }
201:
202: // Fault message
203: } else if (exchange.getFault() != null) {
204: // TODO ... handle the fault
205: exchange.setStatus(ExchangeStatus.DONE);
206: channel.send(exchange);
207: // This is not compliant with the default MEPs
208: } else {
209: throw new IllegalStateException(
210: "Provider exchange is ACTIVE, but no in or fault is provided");
211: }
212: // The component acts as a consumer, this means this exchange is received because
213: // we sent it to another component. As it is active, this is either an out or a fault
214: // If this component does not create / send exchanges, you may just throw an UnsupportedOperationException
215: } else if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
216: // Exchange is finished
217: if (exchange.getStatus() == ExchangeStatus.DONE) {
218: return;
219: // Exchange has been aborted with an exception
220: } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
221: return;
222: // Exchange is active
223: } else {
224: // Out message or fault
225: if (exchange.getMessage("out") != null
226: || exchange.getFault() != null) {
227:
228: // existing process
229: String processInstanceId = (String) processInstanceIdStack
230: .peek();
231: // TODO: configure lease time
232: log.debug("returning to existing process "
233: + processInstanceId);
234: try {
235: sendMessage(exchange,
236: processInstanceIdStack,
237: processInstanceId);
238: exchange.setStatus(ExchangeStatus.DONE);
239: channel.send(exchange);
240: } catch (Throwable e) {
241: log.error(e, e);
242: fail(
243: exchange,
244: e instanceof Exception ? (Exception) e
245: : new RuntimeException(e));
246: }
247: // This is not compliant with the default MEPs
248: } else {
249: throw new IllegalStateException(
250: "Consumer exchange is ACTIVE, but no out or fault is provided");
251: }
252: }
253: // Unknown role
254: } else {
255: throw new IllegalStateException("Unkown role: "
256: + exchange.getRole());
257: }
258: } else {
259: NormalizedMessage inMessage = exchange.getMessage("in");
260: String processInstanceId = (String) inMessage
261: .getProperty("processInstanceId");
262: if (processInstanceId != null) {
263: // calling existing process instance
264: processInstanceIdStack.push(processInstanceId);
265: log.debug("inonly or robust in only inbound "
266: + processInstanceId);
267: try {
268: sendMessage(exchange, processInstanceIdStack,
269: processInstanceId);
270: done(exchange);
271: } catch (Throwable e) {
272: log.error(e, e);
273: fail(exchange,
274: e instanceof Exception ? (Exception) e
275: : new RuntimeException(e));
276: }
277: } else {
278: fail(
279: exchange,
280: new BpmScriptException(
281: "no processInstanceId inMessage property found"));
282: }
283: }
284: }
285:
286: protected ExecutorResult sendMessage(final Object message,
287: final Stack processInstanceIdStack,
288: final String processInstanceId) throws BpmScriptException,
289: IOException, ClassNotFoundException, Throwable {
290:
291: return processInstanceManager.doWithProcessInstance(
292: processInstanceId, new IProcessInstanceCallback() {
293: public ExecutorResult execute(
294: IProcessInstance processInstance)
295: throws Throwable {
296: String state = processInstance.getState();
297: if (ProcessState.COMPLETED.name().equals(state)
298: || ProcessState.FAILED.name().equals(
299: state)) {
300: log.debug("ignored message for "
301: + processInstanceId);
302: return new IgnoredResult(ProcessState
303: .valueOf(state));
304: }
305: return processExecutor.send(
306: processInstanceIdStack,
307: processInstance, message,
308: invocationContext);
309: }
310: });
311:
312: }
313:
314: public String getProcessId() {
315: return processId;
316: }
317:
318: public void setProcessId(String processId) {
319: this .processId = processId;
320: }
321:
322: public IProcessInstanceManager getProcessInstanceManager() {
323: return processInstanceManager;
324: }
325:
326: public void setProcessInstanceManager(
327: IProcessInstanceManager processInstanceManager) {
328: this .processInstanceManager = processInstanceManager;
329: }
330:
331: public String getSourceResource() {
332: return sourceResource;
333: }
334:
335: public void setSourceResource(String sourceResource) {
336: this .sourceResource = sourceResource;
337: }
338:
339: public IProcessExecutor getProcessExecutor() {
340: return processExecutor;
341: }
342:
343: public void setProcessExecutor(IProcessExecutor processExecutor) {
344: this.processExecutor = processExecutor;
345: }
346: }
|