001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi.component;
018:
019: import java.io.InputStreamReader;
020: import java.util.Map;
021: import java.util.Stack;
022: import java.util.concurrent.ConcurrentHashMap;
023:
024: import javax.jbi.component.ComponentContext;
025: import javax.jbi.management.DeploymentException;
026: import javax.jbi.messaging.DeliveryChannel;
027: import javax.jbi.messaging.ExchangeStatus;
028: import javax.jbi.messaging.Fault;
029: import javax.jbi.messaging.MessageExchange;
030: import javax.jbi.messaging.MessageExchangeFactory;
031: import javax.jbi.messaging.MessagingException;
032: import javax.jbi.messaging.NormalizedMessage;
033: import javax.jbi.messaging.MessageExchange.Role;
034: import javax.jbi.servicedesc.ServiceEndpoint;
035: import javax.xml.namespace.QName;
036:
037: import org.apache.servicemix.common.BaseLifeCycle;
038: import org.apache.servicemix.common.Endpoint;
039: import org.apache.servicemix.common.ExchangeProcessor;
040: import org.bpmscript.BpmScriptException;
041: import org.mozilla.javascript.Context;
042: import org.mozilla.javascript.Script;
043: import org.mozilla.javascript.Scriptable;
044: import org.mozilla.javascript.ScriptableObject;
045: import org.springframework.core.io.DefaultResourceLoader;
046: import org.springframework.core.io.Resource;
047: import org.springframework.core.io.ResourceLoader;
048:
049: /**
050: * @org.apache.xbean.XBean element="endpoint"
051: */
052: public class CorellationEndpoint extends Endpoint implements
053: ExchangeProcessor {
054:
055: public static class InstanceAndQueueId {
056: public QName service;
057: public String instanceId;
058: public String queueId;
059:
060: public InstanceAndQueueId(QName service, String instanceId,
061: String queueId) {
062: this .service = service;
063: this .instanceId = instanceId;
064: this .queueId = queueId;
065: }
066:
067: public boolean equals(Object o1) {
068: InstanceAndQueueId i1 = (InstanceAndQueueId) o1;
069: return i1.instanceId.equals(instanceId)
070: && i1.queueId.equals(queueId)
071: && this .service.equals(i1.service);
072: }
073:
074: @Override
075: public int hashCode() {
076: return instanceId.hashCode() + queueId.hashCode()
077: + service.hashCode();
078: }
079: }
080:
081: private QName registerOperation = new QName(
082: "http://bpmscript.org/jbi", "register");
083: private QName deregisterOperation = new QName(
084: "http://bpmscript.org/jbi", "deregister");
085:
086: private Map<InstanceAndQueueId, Script> matchers = new ConcurrentHashMap<InstanceAndQueueId, Script>();
087:
088: private ServiceEndpoint activated;
089: private DeliveryChannel channel;
090: private MessageExchangeFactory exchangeFactory;
091: private ResourceLoader resourceLoader = new DefaultResourceLoader();
092: private String library;
093: private Script libraryScript;
094: private Map<String, MessageExchange> exchanges = new ConcurrentHashMap<String, MessageExchange>();
095:
096: /* (non-Javadoc)
097: * @see org.apache.servicemix.common.Endpoint#getRole()
098: */
099: public Role getRole() {
100: return Role.PROVIDER;
101: }
102:
103: public void activate() throws Exception {
104: logger = this .serviceUnit.getComponent().getLogger();
105: ComponentContext ctx = getServiceUnit().getComponent()
106: .getComponentContext();
107: channel = new EndpointDeliveryChannel(ctx.getDeliveryChannel(),
108: this );
109: exchangeFactory = channel.createExchangeFactory();
110: activated = ctx.activateEndpoint(service, endpoint);
111: if (library != null) {
112: Resource resource = resourceLoader.getResource(library);
113: Context cx = Context.enter();
114: try {
115: libraryScript = cx.compileReader(new InputStreamReader(
116: resource.getInputStream()), library, 0, null);
117: } finally {
118: Context.exit();
119: }
120: }
121: start();
122: }
123:
124: public void deactivate() throws Exception {
125: stop();
126: ServiceEndpoint ep = activated;
127: activated = null;
128: ComponentContext ctx = getServiceUnit().getComponent()
129: .getComponentContext();
130: ctx.deactivateEndpoint(ep);
131: }
132:
133: public ExchangeProcessor getProcessor() {
134: return this ;
135: }
136:
137: public void validate() throws DeploymentException {
138: }
139:
140: protected void send(MessageExchange me) throws MessagingException {
141: if (me.getRole() == MessageExchange.Role.CONSUMER
142: && me.getStatus() == ExchangeStatus.ACTIVE) {
143: BaseLifeCycle lf = (BaseLifeCycle) getServiceUnit()
144: .getComponent().getLifeCycle();
145: lf.sendConsumerExchange(me, (Endpoint) this );
146: } else {
147: channel.send(me);
148: }
149: }
150:
151: protected void done(MessageExchange me) throws MessagingException {
152: me.setStatus(ExchangeStatus.DONE);
153: send(me);
154: }
155:
156: protected void fail(MessageExchange me, Exception error)
157: throws MessagingException {
158: me.setError(error);
159: send(me);
160: }
161:
162: public void start() throws Exception {
163: }
164:
165: public void stop() {
166: }
167:
168: public Object evaluate(MessageExchange exchange,
169: NormalizedMessage in) throws MessagingException {
170: return null;
171: }
172:
173: public void process(MessageExchange messageExchange)
174: throws Exception {
175: // 3 messages types, register, deregister
176:
177: if (messageExchange.getStatus() == ExchangeStatus.DONE) {
178: // ignore done messages
179: return;
180: }
181:
182: boolean request = messageExchange.getRole() == Role.PROVIDER;
183: QName operation = messageExchange.getOperation();
184: NormalizedMessage inMessage = MessageUtil
185: .copyIn(messageExchange);
186: if (request && operation != null) {
187:
188: QName service = (QName) inMessage.getProperty("service");
189: String processInstanceId = ((String) ((Stack) messageExchange
190: .getProperty("processInstanceIdStack")).peek());
191:
192: if (operation.equals(registerOperation)) {
193: // register
194: String queueId = (String) messageExchange
195: .getProperty("queueId");
196: String stringExpression = (String) inMessage
197: .getProperty("expression");
198: Context cx = Context.enter();
199: try {
200: Script script = cx.compileString(stringExpression,
201: "expression", 0, null);
202: matchers.put(new InstanceAndQueueId(service,
203: processInstanceId, queueId), script);
204: messageExchange.setMessage(messageExchange
205: .createMessage(), "out");
206: send(messageExchange);
207: } finally {
208: Context.exit();
209: }
210: } else if (operation.equals(deregisterOperation)) {
211: // deregister
212: String queueId = (String) inMessage
213: .getProperty("corellationQueueId");
214: matchers.remove(new InstanceAndQueueId(service,
215: processInstanceId, queueId));
216: messageExchange.setMessage(messageExchange
217: .createMessage(), "out");
218: send(messageExchange);
219: } else {
220: fail(messageExchange, new BpmScriptException(
221: "Unrecognised oepration " + operation));
222: }
223: } else if (request) {
224: Context cx = Context.enter();
225: try {
226: Scriptable scope = cx.initStandardObjects();
227: ScriptableObject.putProperty(scope, "exchange", Context
228: .javaToJS(messageExchange, scope));
229: ScriptableObject.putProperty(scope, "message", Context
230: .javaToJS(inMessage, scope));
231: if (libraryScript != null) {
232: libraryScript.exec(cx, scope);
233: }
234: // message to corellate
235: for (Map.Entry<InstanceAndQueueId, Script> entry : matchers
236: .entrySet()) {
237: boolean matches = false;
238: try {
239: matches = ((Boolean) entry.getValue().exec(cx,
240: scope)).booleanValue();
241: } catch (Throwable t) {
242: logger.error(t, t);
243: }
244: if (matches) {
245: MessageExchange tme = exchangeFactory
246: .createInOutExchange();
247: tme.setService(new QName(
248: "http://bpmscript.org/jbi",
249: "corellation"));
250: tme.setMessage(tme.createMessage(), "in");
251: // Retrieve target
252: tme.getMessage("in").setProperty(
253: "processInstanceId",
254: entry.getKey().instanceId);
255: tme.getMessage("in").setProperty("queueId",
256: entry.getKey().queueId);
257: // Send in to target
258: // channel.send(tme);
259: exchanges.put(tme.getExchangeId(),
260: messageExchange);
261: channel.send(tme);
262: }
263: }
264: } finally {
265: Context.exit();
266: }
267: } else {
268: MessageExchange producerExchange = exchanges
269: .get(messageExchange.getExchangeId());
270: // Send back the result
271: if (messageExchange.getStatus() == ExchangeStatus.DONE) {
272: done(producerExchange);
273: } else if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
274: fail(producerExchange, messageExchange.getError());
275: } else if (messageExchange.getFault() != null) {
276: Fault fault = MessageUtil.copyFault(messageExchange);
277: done(messageExchange);
278: MessageUtil.transferToFault(fault, producerExchange);
279: channel.send(producerExchange);
280: } else if (messageExchange.getMessage("out") != null) {
281: NormalizedMessage out = MessageUtil
282: .copyOut(messageExchange);
283: done(messageExchange);
284: MessageUtil.transferToOut(out, producerExchange);
285: channel.send(producerExchange);
286: } else {
287: done(messageExchange);
288: throw new IllegalStateException("Exchange status is "
289: + ExchangeStatus.ACTIVE
290: + " but has no Out nor Fault message");
291: }
292: }
293: }
294:
295: protected void sendSync(MessageExchange me)
296: throws MessagingException {
297: logger.info("sending me");
298: if (!channel.sendSync(me)) {
299: throw new MessagingException("SendSync failed");
300: }
301: logger.info("received me");
302: }
303:
304: public void setDeregisterOperation(QName deregisterOperation) {
305: this .deregisterOperation = deregisterOperation;
306: }
307:
308: public void setLibrary(String library) {
309: this .library = library;
310: }
311:
312: public void setRegisterOperation(QName registerOperation) {
313: this .registerOperation = registerOperation;
314: }
315:
316: public void setResourceLoader(ResourceLoader resourceLoader) {
317: this.resourceLoader = resourceLoader;
318: }
319:
320: }
|