001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: DeliveryHandler.java 9472 2007-10-09 15:24:17Z elu $
023: */
024: package com.bostechcorp.cbesb.runtime.ccsl.lib;
025:
026: import java.util.LinkedList;
027: import java.util.ListIterator;
028:
029: import javax.jbi.component.ComponentContext;
030: import javax.jbi.messaging.DeliveryChannel;
031: import javax.jbi.messaging.ExchangeStatus;
032: import javax.jbi.messaging.MessageExchange;
033: import javax.jbi.messaging.MessagingException;
034: import javax.jbi.messaging.NormalizedMessage;
035: import javax.jbi.messaging.MessageExchange.Role;
036:
037: import org.apache.commons.logging.Log;
038:
039: import com.bostechcorp.cbesb.common.constant.MetadataConstants;
040:
041: public class DeliveryHandler {
042: private CcslConfig config;
043:
044: public DeliveryHandler(CcslConfig config) {
045: this .config = config;
046: }
047:
048: public MessageExchange accept(ComponentContext componentContext,
049: DeliveryChannel channel) throws MessagingException {
050: return acceptHandler(componentContext, channel, -1l);
051: }
052:
053: public MessageExchange accept(ComponentContext componentContext,
054: DeliveryChannel channel, long waitTime)
055: throws MessagingException {
056: return acceptHandler(componentContext, channel, waitTime);
057: }
058:
059: private MessageExchange acceptHandler(
060: ComponentContext componentContext, DeliveryChannel channel,
061: long waitTime) throws MessagingException {
062: Log log = config.getLog();
063: boolean saveErrors = config.getDefaultSaveErrors();
064: boolean stripRecord = config.getDefaultStripRecord();
065: boolean useSendMessage = config.getUseSendMessage();
066: EndpointConfig endpoint = null;
067: UpocConfig postAcceptUpoc = null;
068:
069: // try to get an exchange
070: MessageExchange me = null;
071: if (waitTime < 0)
072: me = channel.accept();
073: else
074: me = channel.accept(waitTime);
075:
076: // do UPOC and other processing
077: if (me != null && me.getStatus() == ExchangeStatus.ACTIVE) {
078: log
079: .debug("acceptHandler: MessageExchange.getStatus() == ACTIVE");
080: NormalizedMessage msg = null;
081: String whichMessage = null;
082: String destinationEndpointKey = null;
083: try {
084: // The provider role processes the in message
085: if (me.getRole() == MessageExchange.Role.PROVIDER) {
086: whichMessage = "in";
087: destinationEndpointKey = me.getEndpoint()
088: .getServiceName()
089: + ":" + me.getEndpoint().getEndpointName();
090: ;
091: } else if (me.getRole() == MessageExchange.Role.CONSUMER) {
092: whichMessage = "out";
093: destinationEndpointKey = (String) me
094: .getProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY);
095: } else {
096: MessagingException e = new MessagingException(
097: "invalid role: " + me.getRole());
098: log.error(e + "\n"
099: + ExceptionUtil.stackTraceString(e)
100: + "\n\n\n");
101: throw e;
102: }
103: log.debug("acceptHandler: using \"" + whichMessage
104: + "\" message");
105: msg = me.getMessage(whichMessage);
106: } catch (Exception e) {
107: log.error("Exception in acceptHandler(): "
108: + e.getMessage());
109: if (log.isDebugEnabled()) {
110: log.debug("Exception in acceptHandler():", e);
111: }
112: }
113:
114: log.debug("acceptHandler: destinationEndpointKey = "
115: + destinationEndpointKey);
116: if (destinationEndpointKey != null
117: && destinationEndpointKey.length() > 0) {
118: // Try to find the endpoint configuration
119: endpoint = config
120: .getEndpointConfig(destinationEndpointKey);
121: if (endpoint != null) {
122: log.debug("acceptHandler: found endpoint settings");
123: saveErrors = endpoint.getSaveErrors();
124: stripRecord = endpoint.getStripRecord();
125: useSendMessage = endpoint.getSendMessage();
126: postAcceptUpoc = endpoint.getUpoc("postaccept");
127: log.debug("acceptHandler: useSendMessage = "
128: + useSendMessage);
129: } else
130: log
131: .debug("acceptHandler: no endpoint settings, using component defaults");
132:
133: log.debug("acceptHandler: postAcceptUpoc="
134: + postAcceptUpoc + " saveErrors=" + saveErrors
135: + " stripRecord=" + stripRecord
136: + " useSendMessage=" + useSendMessage);
137:
138: if (postAcceptUpoc != null) {
139: // Run UPOCs
140: try {
141: // run startup if required
142: if (endpoint.getNeedToRunStart()) {
143: UpocConfig startup = endpoint
144: .getUpoc("start");
145: if (startup != null) {
146: CcslUpoc.runScript(log, startup
147: .getRootDir(), "start", startup
148: .getType(), startup
149: .getClassName(), startup
150: .getMethod(), componentContext,
151: channel, me, null, this );
152: endpoint.setNeedToRunStart(false);
153: endpoint.setNeedToRunStop(true);
154: }
155: }
156: // run postAccept
157: CcslUpoc.runScript(log, postAcceptUpoc
158: .getRootDir(), "postaccept",
159: postAcceptUpoc.getType(),
160: postAcceptUpoc.getClassName(),
161: postAcceptUpoc.getMethod(),
162: componentContext, channel, me, null,
163: this );
164: } catch (Exception e) {
165: log.error("UPOC exception : " + e.getMessage());
166: if (log.isDebugEnabled()) {
167: log.debug("UPOC exception :", e);
168: }
169: if (saveErrors)
170: ErrorDb.write(e, me, config.getLog());
171: MessagingException eThrow;
172: if (e instanceof MessagingException)
173: eThrow = (MessagingException) e;
174: else
175: eThrow = new MessagingException(e);
176: throw eThrow;
177: }
178: }
179:
180: }
181:
182: // Strip XML records from the data envelope
183: if (stripRecord)
184: RecordHelper.stripXmlRecord(msg);
185:
186: // add the SendMessage to the message content.
187: if (useSendMessage
188: && (me.getRole() == MessageExchange.Role.CONSUMER)) {
189: log.debug("acceptHandler: add sendMessage response");
190: RecordHelper.addSendMessage(msg, me, true);
191: }
192: }
193: return me;
194: }
195:
196: public void send(ComponentContext componentContext,
197: DeliveryChannel channel, MessageExchange exchange)
198: throws MessagingException {
199: sendOrSendSync(false, -1L, componentContext, channel, exchange);
200: }
201:
202: public boolean sendSync(ComponentContext componentContext,
203: DeliveryChannel channel, MessageExchange exchange)
204: throws MessagingException {
205: return sendOrSendSync(true, -1L, componentContext, channel,
206: exchange);
207: }
208:
209: public boolean sendSync(ComponentContext componentContext,
210: DeliveryChannel channel, MessageExchange exchange,
211: long timeout) throws MessagingException {
212: return sendOrSendSync(true, timeout, componentContext, channel,
213: exchange);
214: }
215:
216: private boolean sendOrSendSync(boolean isSendSync,
217: long syncTimeout, ComponentContext componentContext,
218: DeliveryChannel channel, MessageExchange exchange)
219: throws MessagingException {
220: boolean result = true;
221: boolean saveError = config.getDefaultSaveErrors();
222: boolean addRecord = config.getDafaultAddRecord();
223: boolean useSendMessage = config.getUseSendMessage();
224: EndpointConfig endpoint = null;
225: UpocConfig preSendUpoc = null;
226: UpocConfig postSendUpoc = null;
227: LinkedList<MessageExchange> sendList = new LinkedList<MessageExchange>();
228: Log log = config.getLog();
229:
230: // Try to get the sender endpoint from exchange properties
231: String senderEndpointKey = null;
232: log.debug("sendOrSendSync: exchange.getStatus() is "
233: + exchange.getStatus());
234: if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
235: if (exchange.getRole() == Role.CONSUMER) {
236: log
237: .debug("sendOrSendSync: exchange.getRole() is Role.CONSUMER.");
238: // The consumer role must extract the source endpoint from a Servicemix property.
239: // This is not JBI compliant but JBI gives us no way to determine the sender endpoint.
240: senderEndpointKey = (String) exchange
241: .getProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY);
242: } else {
243: log
244: .debug("sendOrSendSync: exchange.getRole() is not Role.CONSUMER.");
245: // The provider role uses the target endpoint
246: senderEndpointKey = exchange.getEndpoint()
247: .getServiceName()
248: + ":"
249: + exchange.getEndpoint().getEndpointName();
250: }
251: } else {
252: // if the exchange is not active just pass it to the channel.
253: if (isSendSync) {
254: if (syncTimeout == -1L)
255: result = channel.sendSync(exchange);
256: else
257: result = channel.sendSync(exchange, syncTimeout);
258: } else {
259: channel.send(exchange);
260: }
261: return result;
262: }
263:
264: log.debug("sendOrSendSync: senderEndpointKey = "
265: + senderEndpointKey);
266: if (senderEndpointKey != null && senderEndpointKey.length() > 0) {
267: // Try to find the endpoint configuration
268: endpoint = config.getEndpointConfig(senderEndpointKey);
269: if (endpoint != null) {
270: saveError = endpoint.getSaveErrors();
271: addRecord = endpoint.getAddRecord();
272: useSendMessage = endpoint.getSendMessage();
273: log
274: .debug("sendOrSendSync: endpoint config useSendMessage = "
275: + useSendMessage);
276: preSendUpoc = endpoint.getUpoc("presend");
277: if (isSendSync)
278: postSendUpoc = endpoint.getUpoc("postsend");
279: }
280: }
281:
282: // Convert <SendMessage> envelope into nmh style.
283: if (useSendMessage && exchange.getRole() == Role.CONSUMER) {
284: log.debug("sendOrSendSync: ready to stripSendMessage");
285: RecordHelper.stripSendMessage(exchange.getMessage("in"),
286: exchange, false);
287: }
288:
289: // add the XML record to the message content
290: if (addRecord) {
291: if (exchange.getRole() == Role.PROVIDER) {
292: RecordHelper.addXmlRecord(exchange.getMessage("out"));
293: } else {
294: RecordHelper.addXmlRecord(exchange.getMessage("in"));
295: }
296: }
297:
298: // if there is no presend UPOC then the sendList is just the original exchange
299: if (preSendUpoc == null) {
300: sendList.add(exchange);
301: } else {
302: // Run UPOCs
303: try {
304: // run startup if required
305: if (endpoint.getNeedToRunStart()) {
306: UpocConfig startup = endpoint.getUpoc("start");
307: if (startup != null) {
308: CcslUpoc.runScript(log, startup.getRootDir(),
309: "start", startup.getType(), startup
310: .getClassName(), startup
311: .getMethod(), componentContext,
312: channel, exchange, null, this );
313: endpoint.setNeedToRunStart(false);
314: endpoint.setNeedToRunStop(true);
315: }
316: }
317: // run presend and get sendList
318: Object rtn = CcslUpoc.runScript(log, preSendUpoc
319: .getRootDir(), "presend",
320: preSendUpoc.getType(), preSendUpoc
321: .getClassName(), preSendUpoc
322: .getMethod(), componentContext,
323: channel, exchange, null, this );
324: if (rtn instanceof LinkedList)
325: sendList = (LinkedList) rtn;
326: else if (rtn instanceof MessageExchange) {
327: sendList = new LinkedList<MessageExchange>();
328: sendList.add((MessageExchange) rtn);
329: }
330: } catch (Exception e) {
331: log.error("UPOC exception : " + e.getMessage());
332: if (log.isDebugEnabled()) {
333: log.debug("UPOC exception :", e);
334: }
335: // e.printStackTrace();
336: if (saveError)
337: ErrorDb.write(e, exchange, config.getLog());
338: MessagingException me;
339: if (e instanceof MessagingException)
340: me = (MessagingException) e;
341: else
342: me = new MessagingException(e);
343: throw me;
344: }
345: }
346:
347: // Send out everything in the send list
348: MessagingException sendException = null;
349: String originalId = exchange.getExchangeId();
350: MessageExchange originalExchange = null;
351: for (ListIterator iter = sendList.listIterator(); iter
352: .hasNext()
353: || originalExchange != null;) {
354: // save the original exchange for last so it comes back as the response for in-out
355: MessageExchange sendExchange = null;
356: if (iter.hasNext()) {
357: sendExchange = (MessageExchange) iter.next();
358: if (sendExchange.getExchangeId().equals(originalId)) {
359: originalExchange = sendExchange;
360: continue;
361: }
362: } else {
363: sendExchange = originalExchange;
364: originalExchange = null;
365: }
366: // send the exchange
367: try {
368: if (isSendSync) {
369: if (syncTimeout == -1L)
370: result = channel.sendSync(sendExchange);
371: else
372: result = channel.sendSync(sendExchange,
373: syncTimeout);
374: if (postSendUpoc != null) {
375: // Run postsend UPOC
376: try {
377: CcslUpoc.runScript(log, postSendUpoc
378: .getRootDir(), "postsend",
379: postSendUpoc.getType(),
380: postSendUpoc.getClassName(),
381: postSendUpoc.getMethod(),
382: componentContext, channel,
383: exchange, null, this );
384: } catch (Exception e) {
385: log.error("UPOC exception : "
386: + e.getMessage());
387: if (log.isDebugEnabled()) {
388: log.debug("UPOC exception :", e);
389: }
390: if (saveError)
391: ErrorDb.write(e, exchange, config
392: .getLog());
393: MessagingException me;
394: if (e instanceof MessagingException)
395: me = (MessagingException) e;
396: else
397: me = new MessagingException(e);
398: throw me;
399: }
400: }
401: } else {
402: channel.send(sendExchange);
403: }
404: } catch (Exception e) {
405: if (saveError)
406: ErrorDb.write(e, sendExchange, config.getLog());
407: if (sendException != null) {
408: // if this is not the first exception then log the previous one
409: log.error(e.toString() + "\n"
410: + ExceptionUtil.stackTraceString(e)
411: + "\n\n\n");
412: }
413: if (e instanceof MessagingException)
414: sendException = (MessagingException) e;
415: else
416: sendException = new MessagingException(e);
417: }
418: }
419: // throw the last exception if there was one
420: if (sendException != null)
421: throw sendException;
422: return result;
423: }
424: }
|