001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: SequencingProviderProcessor.java 12166 2008-03-03 02:18:48Z lzheng $
023: */
024:
025: package com.bostechcorp.cbesb.runtime.component.sequencing.processors;
026:
027: import java.io.File;
028: import java.util.Iterator;
029:
030: import javax.jbi.JBIException;
031: import javax.jbi.component.ComponentContext;
032: import javax.jbi.messaging.DeliveryChannel;
033: import javax.jbi.messaging.ExchangeStatus;
034: import javax.jbi.messaging.InOnly;
035: import javax.jbi.messaging.InOut;
036: import javax.jbi.messaging.MessageExchange;
037: import javax.jbi.messaging.MessageExchangeFactory;
038: import javax.jbi.messaging.MessagingException;
039: import javax.jbi.messaging.NormalizedMessage;
040: import javax.jbi.messaging.RobustInOnly;
041: import javax.jbi.servicedesc.ServiceEndpoint;
042: import javax.xml.namespace.QName;
043:
044: import org.w3c.dom.Document;
045:
046: import com.bostechcorp.cbesb.common.runtime.CbesbException;
047: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbProviderProcessor;
048: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ServiceDescriptionHandler;
049: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.NormalizedMessageHandler;
050: import com.bostechcorp.cbesb.runtime.component.sequencing.SequencingEndpoint;
051:
052: public class SequencingProviderProcessor extends CbProviderProcessor {
053:
054: // Thread-safety issue: the serviceList attribute is thread safe since it is created whe the endpoint starts. And it is read-only
055: // after that.
056: private ServiceList serviceList;
057:
058: private MessageExchangeFactory messageExchangeFactory;
059:
060: private DeliveryChannel channel;
061:
062: private SequencingEndpoint endpoint;
063:
064: /**
065: * @param endpoint
066: */
067: public SequencingProviderProcessor(SequencingEndpoint endpoint) {
068: super (endpoint);
069: this .endpoint = endpoint;
070: }
071:
072: /*
073: * (non-Javadoc)
074: *
075: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#start()
076: */
077: @Override
078: public void start() throws Exception {
079: super .start();
080: loadServiceList();
081: }
082:
083: /**
084: * @return the channel
085: */
086: public DeliveryChannel getChannel() {
087: return channel;
088: }
089:
090: /**
091: * @param channel
092: * the channel to set
093: */
094: public void setChannel(DeliveryChannel channel) {
095: this .channel = channel;
096: }
097:
098: /**
099: * @return the messageExchangeFactory
100: */
101: public MessageExchangeFactory getMessageExchangeFactory() {
102: return messageExchangeFactory;
103: }
104:
105: /**
106: * @param messageExchangeFactory
107: * the messageExchangeFactory to set
108: */
109: public void setMessageExchangeFactory(
110: MessageExchangeFactory messageExchangeFactory) {
111: this .messageExchangeFactory = messageExchangeFactory;
112: }
113:
114: /*
115: * (non-Javadoc)
116: *
117: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#processInMessage(javax.xml.namespace.QName,
118: * javax.xml.namespace.QName, javax.jbi.messaging.NormalizedMessage)
119: */
120: @Override
121: protected void processInMessage(QName service, QName operation,
122: NormalizedMessage in, MessageExchange exchange)
123: throws JBIException, CbesbException {
124:
125: // Process the message, but disregard the response message.
126:
127: int targetCount = serviceList.size();
128: int lastTarget = targetCount - 1;
129: NormalizedMessage currentMsg = in;
130: MessageExchange currentExchange = exchange;
131:
132: for (int i = 0; i < targetCount; i++) {
133: // Get the next Target from the Service List
134: ExchangeTarget target = serviceList
135: .getTargetServiceAtIndex(i);
136: //If it is a last target
137: if (i == lastTarget) {
138: if (exchange instanceof InOnly) {
139: InOnly newExchange = messageExchangeFactory
140: .createInOnlyExchange();
141: target.configureTarget(newExchange);
142: transferExchangeProperties(currentExchange,
143: newExchange);
144:
145: // Create a new message and transfer the contents
146: NormalizedMessage newInMsg = newExchange
147: .createMessage();
148: ServiceDescriptionHandler svcDescHandler = getSvcDescHandlerForTarget(target);
149: transferMsgContents(currentMsg, newInMsg,
150: svcDescHandler);
151:
152: /**
153: * should release the currentExchange by setting the DONE status.
154: */
155: if (i > 0) {
156: currentExchange.setStatus(ExchangeStatus.DONE);
157: channel.send(currentExchange);
158: }
159:
160: newExchange.setInMessage(newInMsg);
161:
162: // Send the exchange
163: channel.sendSync(newExchange, target.getTimeout());
164: } else {
165: RobustInOnly newExchange = messageExchangeFactory
166: .createRobustInOnlyExchange();
167: target.configureTarget(newExchange);
168: transferExchangeProperties(currentExchange,
169: newExchange);
170:
171: // Create a new message and transfer the contents
172: NormalizedMessage newInMsg = newExchange
173: .createMessage();
174: ServiceDescriptionHandler svcDescHandler = getSvcDescHandlerForTarget(target);
175: transferMsgContents(currentMsg, newInMsg,
176: svcDescHandler);
177:
178: /**
179: * should release the currentExchange by setting the DONE status.
180: */
181: if (i > 0) {
182: currentExchange.setStatus(ExchangeStatus.DONE);
183: channel.send(currentExchange);
184: }
185:
186: newExchange.setInMessage(newInMsg);
187:
188: // Send the exchange
189: channel.sendSync(newExchange, target.getTimeout());
190:
191: if (newExchange.getFault() != null) {
192: // fault received; we should report back
193: exchange.setFault(newExchange.getFault());
194:
195: /*
196: * should release the newExchange by setting the DONE status.
197: */
198: newExchange.setStatus(ExchangeStatus.DONE);
199: channel.send(newExchange);
200:
201: }
202:
203: }
204:
205: }
206: //If it is not a last target
207: else {
208: // Create a new InOut exchange and configure it for the target
209: InOut newExchange = messageExchangeFactory
210: .createInOutExchange();
211: target.configureTarget(newExchange);
212: transferExchangeProperties(currentExchange, newExchange);
213:
214: // Create a new message and transfer the contents
215: NormalizedMessage newInMsg = newExchange
216: .createMessage();
217: ServiceDescriptionHandler svcDescHandler = getSvcDescHandlerForTarget(target);
218: transferMsgContents(currentMsg, newInMsg,
219: svcDescHandler);
220:
221: /**
222: * should release the currentExchange by setting the DONE status.
223: */
224: if (i > 0) {
225: currentExchange.setStatus(ExchangeStatus.DONE);
226: channel.send(currentExchange);
227: }
228:
229: newExchange.setInMessage(newInMsg);
230:
231: // Send the exchange
232: channel.sendSync(newExchange, target.getTimeout());
233: if (newExchange.getFault() != null) {
234: // fault received; we should report back
235: exchange.setFault(newExchange.getFault());
236:
237: /*
238: * should release the newExchange by setting the DONE status.
239: */
240: newExchange.setStatus(ExchangeStatus.DONE);
241: channel.send(newExchange);
242:
243: // we need to stop the sequencing earlier
244: break;
245: } else {
246: currentMsg = newExchange.getOutMessage();
247: currentExchange = newExchange;
248: }
249: }
250: }
251:
252: }
253:
254: /*
255: * (non-Javadoc)
256: *
257: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#processInOutMessage(javax.xml.namespace.QName,
258: * javax.xml.namespace.QName, javax.jbi.messaging.NormalizedMessage,
259: * javax.jbi.messaging.NormalizedMessage, boolean)
260: */
261: @Override
262: protected boolean processInOutMessage(QName service,
263: QName operation, NormalizedMessage in,
264: NormalizedMessage out, boolean optionalOut,
265: MessageExchange exchange) throws JBIException,
266: CbesbException {
267:
268: // Process the message and return the result
269:
270: NormalizedMessage currentMsg = in;
271: MessageExchange currentExchange = exchange;
272:
273: int targetCount = serviceList.size();
274:
275: for (int i = 0; i < targetCount; i++) {
276: // Get the next Target from the Service List
277: ExchangeTarget target = serviceList
278: .getTargetServiceAtIndex(i);
279:
280: // Create a new InOut exchange and configure it for the target
281: InOut newExchange = messageExchangeFactory
282: .createInOutExchange();
283: target.configureTarget(newExchange);
284: transferExchangeProperties(currentExchange, newExchange);
285:
286: // Create a new message and transfer the contents
287: NormalizedMessage newInMsg = newExchange.createMessage();
288: ServiceDescriptionHandler svcDescHandler = getSvcDescHandlerForTarget(target);
289: transferMsgContents(currentMsg, newInMsg, svcDescHandler);
290:
291: /**
292: * should release the currentExchange by setting the DONE status.
293: */
294: if (i > 0) {
295: currentExchange.setStatus(ExchangeStatus.DONE);
296: channel.send(currentExchange);
297: }
298:
299: newExchange.setInMessage(newInMsg);
300:
301: // Send the exchange
302: channel.sendSync(newExchange, target.getTimeout());
303:
304: if (newExchange.getFault() != null) {
305: // fault received; we should report back
306: exchange.setFault(newExchange.getFault());
307:
308: /*
309: * should release the newExchange by setting the DONE status.
310: */
311: newExchange.setStatus(ExchangeStatus.DONE);
312: channel.send(newExchange);
313:
314: // we need to stop the sequencing earlier
315: return false;
316: } else {
317: currentMsg = newExchange.getOutMessage();
318: currentExchange = newExchange;
319:
320: if (currentMsg == null) {
321: throw new CbesbException(
322: "Timeout from the target service : "
323: + target.getService());
324: }
325: }
326: }
327: transferExchangeProperties(currentExchange, exchange);
328: ServiceDescriptionHandler svcDescHandler = this
329: .getProviderSvcDescHandlerInstance();
330: transferMsgContents(currentMsg, out, svcDescHandler);
331:
332: return true;
333: }
334:
335: /**
336: * Loads the service list from a file.
337: *
338: * @throws Exception
339: */
340: private void loadServiceList() throws Exception {
341: File serviceListFile = new File(getEndpoint().getServiceUnit()
342: .getRootPath(), ((SequencingEndpoint) getEndpoint())
343: .getServiceList());
344: serviceList = ServiceList.load(serviceListFile);
345: }
346:
347: protected ServiceDescriptionHandler getSvcDescHandlerForTarget(
348: ExchangeTarget target) {
349: try {
350: ComponentContext context = endpoint.getServiceUnit()
351: .getComponent().getComponentContext();
352: ServiceEndpoint[] endpoints = context
353: .getEndpointsForService(target.getService());
354: if (endpoints.length > 0) {
355: Document wsdlDoc = context
356: .getEndpointDescriptor(endpoints[0]);
357: return ServiceDescriptionHandler.getInstance(wsdlDoc);
358: }
359: } catch (Exception e) {
360: logger
361: .error(
362: "Exception while retrieving target service description:",
363: e);
364: }
365: return null;
366: }
367:
368: private void transferMsgContents(NormalizedMessage source,
369: NormalizedMessage dest,
370: ServiceDescriptionHandler svcDescHandler)
371: throws CbesbException, MessagingException {
372: NormalizedMessageHandler nmhSource = new NormalizedMessageHandler(
373: source);
374: nmhSource.copy(dest, svcDescHandler);
375:
376: for (Iterator it = source.getPropertyNames().iterator(); it
377: .hasNext();) {
378: String name = (String) it.next();
379: dest.setProperty(name, source.getProperty(name));
380: }
381:
382: dest.setSecuritySubject(source.getSecuritySubject());
383: }
384:
385: private void transferExchangeProperties(
386: MessageExchange srcExchange, MessageExchange targetExchange) {
387: for (Iterator it = srcExchange.getPropertyNames().iterator(); it
388: .hasNext();) {
389: String name = (String) it.next();
390: targetExchange.setProperty(name, srcExchange
391: .getProperty(name));
392: }
393: }
394:
395: }
|