001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.drools.model;
018:
019: import javax.jbi.component.ComponentContext;
020: import javax.jbi.messaging.DeliveryChannel;
021: import javax.jbi.messaging.ExchangeStatus;
022: import javax.jbi.messaging.Fault;
023: import javax.jbi.messaging.InOnly;
024: import javax.jbi.messaging.MessageExchange;
025: import javax.jbi.messaging.MessagingException;
026: import javax.jbi.messaging.NormalizedMessage;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030: import org.apache.servicemix.JbiConstants;
031: import org.apache.servicemix.client.ServiceMixClient;
032: import org.apache.servicemix.client.ServiceMixClientFacade;
033: import org.apache.servicemix.common.EndpointSupport;
034: import org.apache.servicemix.drools.DroolsEndpoint;
035: import org.apache.servicemix.jbi.jaxp.StringSource;
036: import org.apache.servicemix.jbi.resolver.URIResolver;
037: import org.apache.servicemix.jbi.util.MessageUtil;
038: import org.drools.FactHandle;
039: import org.drools.WorkingMemory;
040:
041: /**
042: * A helper class for use inside a rule to forward a message to an endpoint
043: *
044: * @version $Revision: 426415 $
045: */
046: public class JbiHelper {
047:
048: private DroolsEndpoint endpoint;
049: private Exchange exchange;
050: private WorkingMemory memory;
051: private FactHandle exchangeFactHandle;
052:
053: public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange,
054: WorkingMemory memory) {
055: this .endpoint = endpoint;
056: this .exchange = new Exchange(exchange, endpoint
057: .getNamespaceContext());
058: this .memory = memory;
059: this .exchangeFactHandle = this .memory
060: .assertObject(this .exchange);
061: }
062:
063: public DroolsEndpoint getEndpoint() {
064: return endpoint;
065: }
066:
067: public ComponentContext getContext() {
068: return endpoint.getContext();
069: }
070:
071: public DeliveryChannel getChannel() throws MessagingException {
072: return getContext().getDeliveryChannel();
073: }
074:
075: public ServiceMixClient getClient() {
076: return new ServiceMixClientFacade(getContext());
077: }
078:
079: public Exchange getExchange() {
080: return exchange;
081: }
082:
083: public Log getLogger() {
084: return LogFactory.getLog(memory.getRuleBase().getPackages()[0]
085: .getName());
086: }
087:
088: /**
089: * Forwards the inbound message to the given
090: *
091: * @param uri
092: * @param localPart
093: */
094: /*
095: public void forward(String uri) throws MessagingException {
096: if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
097: MessageExchange me = getChannel().createExchangeFactory().createExchange(exchange.getPattern());
098: URIResolver.configureExchange(me, getContext(), uri);
099: MessageUtil.transferToIn(in, me);
100: getChannel().sendSync(me);
101: } else {
102: throw new MessagingException("Only InOnly and RobustInOnly exchanges can be forwarded");
103: }
104: }
105: */
106: public void route(String uri) throws MessagingException {
107: routeTo(null, uri);
108: }
109:
110: public void routeTo(String content, String uri)
111: throws MessagingException {
112: MessageExchange me = this .exchange.getInternalExchange();
113: String correlationId = (String) exchange
114: .getProperty(JbiConstants.CORRELATION_ID);
115: NormalizedMessage in = null;
116: if (content == null) {
117: in = me.getMessage("in");
118: } else {
119: in = me.createMessage();
120: in.setContent(new StringSource(content));
121: }
122: MessageExchange newMe = getChannel().createExchangeFactory()
123: .createExchange(me.getPattern());
124: URIResolver.configureExchange(newMe, getContext(), uri);
125: MessageUtil.transferToIn(in, newMe);
126: // Set the sender endpoint property
127: String key = EndpointSupport.getKey(endpoint);
128: newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
129: newMe.setProperty(JbiConstants.CORRELATION_ID, correlationId);
130: getChannel().sendSync(newMe);
131: if (newMe.getStatus() == ExchangeStatus.DONE) {
132: me.setStatus(ExchangeStatus.DONE);
133: getChannel().send(me);
134: } else if (newMe.getStatus() == ExchangeStatus.ERROR) {
135: me.setStatus(ExchangeStatus.ERROR);
136: me.setError(newMe.getError());
137: getChannel().send(me);
138: } else {
139: if (newMe.getFault() != null) {
140: MessageUtil.transferFaultToFault(newMe, me);
141: } else {
142: MessageUtil.transferOutToOut(newMe, me);
143: }
144: getChannel().sendSync(me);
145: }
146: update();
147: }
148:
149: public void routeToDefault(String content)
150: throws MessagingException {
151: routeTo(content, endpoint.getDefaultRouteURI());
152: }
153:
154: public void fault(String content) throws Exception {
155: MessageExchange me = this .exchange.getInternalExchange();
156: if (me instanceof InOnly) {
157: me.setError(new Exception(content));
158: getChannel().send(me);
159: } else {
160: Fault fault = me.createFault();
161: fault.setContent(new StringSource(content));
162: me.setFault(fault);
163: getChannel().sendSync(me);
164: }
165: update();
166: }
167:
168: public void answer(String content) throws Exception {
169: MessageExchange me = this .exchange.getInternalExchange();
170: NormalizedMessage out = me.createMessage();
171: out.setContent(new StringSource(content));
172: me.setMessage(out, "out");
173: getChannel().sendSync(me);
174: update();
175: }
176:
177: protected void update() {
178: this.memory
179: .modifyObject(this.exchangeFactHandle, this.exchange);
180: }
181:
182: }
|