001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: * $Id: EndpointProcessor.java 12050 2008-02-22 05:48:33Z lzheng $
022: */
023:
024: package com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging;
025:
026: import java.net.URI;
027:
028: import javax.jbi.JBIException;
029: import javax.jbi.component.ComponentContext;
030: import javax.jbi.messaging.DeliveryChannel;
031: import javax.jbi.messaging.ExchangeStatus;
032: import javax.jbi.messaging.Fault;
033: import javax.jbi.messaging.InOnly;
034: import javax.jbi.messaging.InOptionalOut;
035: import javax.jbi.messaging.InOut;
036: import javax.jbi.messaging.MessageExchange;
037: import javax.jbi.messaging.MessageExchangeFactory;
038: import javax.jbi.messaging.MessagingException;
039: import javax.jbi.messaging.NormalizedMessage;
040: import javax.jbi.messaging.RobustInOnly;
041: import javax.jbi.messaging.MessageExchange.Role;
042: import javax.jbi.servicedesc.ServiceEndpoint;
043: import javax.xml.namespace.QName;
044: import javax.xml.transform.Source;
045:
046: import org.apache.commons.logging.Log;
047: import org.apache.commons.logging.LogFactory;
048:
049: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.FaultHandler;
050: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.SourceHelper;
051:
052: /**
053: * The abtract Endpoint class for ChainBuilder ESB components. It provides the base functions for
054: * dealing with both consumer and provider endpoints in JBI component. The component developer
055: * should extend this class and overide the methods only if it is necessary to change basic behaviors.
056: *
057: * It hides the great complexity of JBI Spec and API from component developers.
058: *
059: *
060: * @author elu
061: *
062: */
063: public abstract class EndpointProcessor extends CbEndpoint implements
064: IExchangeProcessor {
065: protected final transient Log logger = LogFactory
066: .getLog(getClass());
067:
068: protected ServiceEndpoint activated;
069: protected DeliveryChannel channel;
070: protected MessageExchangeFactory exchangeFactory;
071: protected Role role;
072: protected URI defaultMep;
073: protected QName defaultOperation;
074:
075: protected IComponentProcessor providerProc;
076: protected IComponentProcessor consumerProc;
077:
078: /* (non-Javadoc)
079: * @see org.apache.servicemix.common.Endpoint#getRole()
080: */
081: public Role getRole() {
082: return role;
083: }
084:
085: public void setRole(Role role) {
086: this .role = role;
087: }
088:
089: public URI getDefaultMep() {
090: return defaultMep;
091: }
092:
093: public void setDefaultMep(URI defaultMep) {
094: this .defaultMep = defaultMep;
095: }
096:
097: public QName getDefaultOperation() {
098: return defaultOperation;
099: }
100:
101: public void setDefaultOperation(QName defaultOperation) {
102: this .defaultOperation = defaultOperation;
103: }
104:
105: /**
106: * @return the channel
107: */
108: public DeliveryChannel getChannel() {
109: return channel;
110: }
111:
112: /**
113: * @return the exchangeFactory
114: */
115: public MessageExchangeFactory getExchangeFactory() {
116: return exchangeFactory;
117: }
118:
119: /* (non-Javadoc)
120: * @see org.servicemix.common.Endpoint#activate()
121: */
122: public void activate() throws Exception {
123: /*
124: * start() can be overridden in the subclass so that it can initialize
125: */
126: start();
127:
128: /*
129: * Activate an internal JBI endpoint and create the appropriate processors
130: */
131: ComponentContext ctx = this .serviceUnit.getComponent()
132: .getComponentContext();
133: channel = ctx.getDeliveryChannel();
134: exchangeFactory = channel.createExchangeFactory();
135: activated = ctx.activateEndpoint(service, endpoint);
136: if (getRole() == Role.PROVIDER) {
137: providerProc = createProviderProcessor();
138: providerProc.start();
139: } else {
140: consumerProc = createConsumerProcessor();
141: consumerProc.start();
142: // The consumer endpoint also needs an provider processor for accept the MessageExchange in DONE or ERROR status
143: // It is not meaned to handle the ACTIVE MessageExchange
144: providerProc = createProviderProcessor();
145: providerProc.start();
146: }
147: }
148:
149: /* (non-Javadoc)
150: * @see org.servicemix.common.Endpoint#deactivate()
151: */
152: public void deactivate() throws Exception {
153: ComponentContext ctx = this .serviceUnit.getComponent()
154: .getComponentContext();
155: ServiceEndpoint ep = activated;
156: activated = null;
157: ctx.deactivateEndpoint(ep);
158: if (getRole() == Role.PROVIDER) {
159: providerProc.stop();
160: } else {
161: providerProc.stop();
162: consumerProc.stop();
163: }
164:
165: /*
166: * stop() can be overridden in the subclass so that it can clean up
167: */
168: stop();
169: }
170:
171: protected abstract IComponentProcessor createProviderProcessor()
172: throws Exception;
173:
174: protected abstract IComponentProcessor createConsumerProcessor()
175: throws Exception;
176:
177: protected ServiceEndpoint createExternalEndpoint() {
178: return null;
179: }
180:
181: public IExchangeProcessor getProcessor() {
182: return this ;
183: }
184:
185: public abstract void start() throws Exception;
186:
187: public abstract void stop() throws Exception;
188:
189: /*
190: * The process() method is called when a MessageExchange is accepted from NMR
191: */
192: public void process(MessageExchange exchange) throws Exception {
193: long startProcessTime = System.currentTimeMillis();
194:
195: if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
196: if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
197: processAsProvider(exchange);
198: } else if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
199: throw new JBIException(
200: "Receive an unexpected MessageExchange in Consumer endpoint:"
201: + exchange.toString());
202: }
203: if (exchange.getFault() == null)
204: postProcess(startProcessTime);
205: } else {
206: // not an ACTIVE status
207: logger
208: .debug("Ignoring a non-active MessageExchange, status="
209: + exchange.getStatus());
210: }
211:
212: }
213:
214: protected void postProcess(long startProcessTime) {
215:
216: }
217:
218: protected void processAsProvider(MessageExchange exchange)
219: throws Exception {
220:
221: if (exchange instanceof InOnly) {
222: providerProc.processInOnly(exchange);
223: } else if (exchange instanceof RobustInOnly) {
224: providerProc.processRobustInOnly(exchange);
225: } else if (exchange instanceof InOut) {
226: providerProc.processInOut(exchange, false);
227: } else if (exchange instanceof InOptionalOut) {
228: providerProc.processInOut(exchange, true);
229: } else {
230: Exception e = new JBIException(
231: "MessageExchangePattern not recognized :"
232: + exchange.getPattern());
233: exchange.setError(e);
234: channel.send(exchange);
235: throw e;
236: }
237:
238: channel.send(exchange);
239:
240: }
241:
242: /**
243: * If the received exchange is a fault response from the consumer,
244: * acknowledge it : status DONE
245: *
246: * @param ex
247: * @return true if the ack. of the Fault is done; false if no fault is
248: * present in the exchange
249: * @throws Exception
250: */
251: protected boolean ackFaultReception(MessageExchange ex)
252: throws Exception {
253: boolean result = false;
254:
255: if (ex.getFault() != null) {
256: ex.setStatus(ExchangeStatus.DONE);
257:
258: result = true;
259: }
260: return result;
261: }
262:
263: /**
264: * Creates a Fault from an exception
265: *
266: * @param e
267: * @return
268: */
269: public Fault createFault(Exception e, MessageExchange exchange)
270: throws MessagingException {
271: Fault f = exchange.createFault();
272: FaultHandler fh = new FaultHandler(e, (CbEndpoint) this );
273:
274: // String faultString = SourceHelper.createSoapFault(e, null);
275: String faultString = fh.toSoap();
276: Source content = SourceHelper.createSource(faultString);
277: f.setContent(content);
278: exchange.setFault(f);
279: return f;
280: }
281:
282: /**
283: * Creates a Fault from an exception
284: *
285: * @param e
286: * @return
287: */
288: public Fault createRecoverableFault(Exception e,
289: MessageExchange exchange) throws MessagingException {
290: // Fault f = exchange.createFault();
291: //// String faultString = SourceHelper.createRecoverableFault(e, null);
292: //
293: // Source content = SourceHelper.createSource(faultString);
294: // f.setContent(content);
295: // exchange.setFault(f);
296: // return f;
297: return createFault(e, exchange);
298: }
299:
300: /**
301: * Creates a NormalizedMessage for Out
302: *
303: * @return
304: */
305: protected NormalizedMessage createNormalizedMessage(
306: MessageExchange exchange) throws MessagingException {
307: return exchange.createMessage();
308: }
309:
310: // protected void logError(Throwable e) {
311: // if (logger != null)
312: // logger.error( e.getMessage(), e);
313: // else
314: // e.printStackTrace();
315: // }
316:
317: }
|