001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: SequencingProviderProcessor.java 2658 2006-10-27 05:50:21Z elu $
023: */
024:
025: package com.bostechcorp.cbesb.runtime.component.cbr.processors;
026:
027: import java.io.File;
028: import java.util.Iterator;
029: import java.util.Vector;
030:
031: import javax.jbi.JBIException;
032: import javax.jbi.messaging.DeliveryChannel;
033: import javax.jbi.messaging.ExchangeStatus;
034: import javax.jbi.messaging.InOnly;
035: import javax.jbi.messaging.InOut;
036: import javax.jbi.messaging.MessageExchange;
037: import javax.jbi.messaging.MessageExchangeFactory;
038: import javax.jbi.messaging.MessagingException;
039: import javax.jbi.messaging.NormalizedMessage;
040: import javax.jbi.messaging.RobustInOnly;
041: import javax.xml.namespace.QName;
042: import javax.xml.transform.Source;
043: import javax.xml.transform.Transformer;
044: import javax.xml.transform.TransformerException;
045: import javax.xml.transform.TransformerFactory;
046: import javax.xml.transform.dom.DOMResult;
047: import javax.xml.transform.dom.DOMSource;
048:
049: import com.bostechcorp.cbesb.common.runtime.CbesbException;
050: import com.bostechcorp.cbesb.common.runtime.ConfigurationException;
051: import com.bostechcorp.cbesb.common.runtime.DataContentException;
052: import com.bostechcorp.cbesb.common.util.Dom;
053: import com.bostechcorp.cbesb.common.util.ErrorUtil;
054: import com.bostechcorp.cbesb.runtime.cbr.CBRException;
055: import com.bostechcorp.cbesb.runtime.cbr.ExpressionHandler;
056: import com.bostechcorp.cbesb.runtime.cbr.RoutingRuleElement;
057: import com.bostechcorp.cbesb.runtime.cbr.RoutingRulesHandler;
058: import com.bostechcorp.cbesb.runtime.cbr.TargetElement;
059: import com.bostechcorp.cbesb.runtime.cbr.TrxIDHandler;
060: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbProviderProcessor;
061: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ServiceDescriptionHandler;
062: import com.bostechcorp.cbesb.runtime.ccsl.lib.ErrorDb;
063: import com.bostechcorp.cbesb.runtime.ccsl.lib.NormalizedMessageUtil;
064: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.JbiWrapperHandler;
065: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.NormalizedMessageHandler;
066: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.StringSource;
067: import com.bostechcorp.cbesb.runtime.component.cbr.CBREndpoint;
068:
069: import javax.jbi.component.ComponentContext;
070: import javax.jbi.servicedesc.ServiceEndpoint;
071: import org.w3c.dom.Document;
072: import org.w3c.dom.Element;
073: import org.w3c.dom.Node;
074:
075: public class CBRProviderProcessor extends CbProviderProcessor {
076:
077: private static final String PROPERTY_CBR_REPLY_RECORD_ENDPOINTS = "CBR.Reply.RecordEndpoints";
078:
079: private RoutingRulesHandler routingRuleHandler;
080:
081: private MessageExchangeFactory messageExchangeFactory;
082:
083: private DeliveryChannel channel;
084:
085: private CBREndpoint endpoint;
086:
087: /**
088: * @param endpoint
089: */
090: public CBRProviderProcessor(CBREndpoint endpoint) {
091: super (endpoint);
092: this .endpoint = endpoint;
093: }
094:
095: /*
096: * (non-Javadoc)
097: *
098: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#start()
099: */
100: @Override
101: public void start() throws Exception {
102: loadRoutingRules();
103: super .start();
104: }
105:
106: /**
107: * @return the channel
108: */
109: public DeliveryChannel getChannel() {
110: return channel;
111: }
112:
113: /**
114: * @param channel
115: * the channel to set
116: */
117: public void setChannel(DeliveryChannel channel) {
118: this .channel = channel;
119: }
120:
121: /**
122: * @return the messageExchangeFactory
123: */
124: public MessageExchangeFactory getMessageExchangeFactory() {
125: return messageExchangeFactory;
126: }
127:
128: /**
129: * @param messageExchangeFactory
130: * the messageExchangeFactory to set
131: */
132: public void setMessageExchangeFactory(
133: MessageExchangeFactory messageExchangeFactory) {
134: this .messageExchangeFactory = messageExchangeFactory;
135: }
136:
137: /*
138: * (non-Javadoc)
139: *
140: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#processInMessage(javax.xml.namespace.QName,
141: * javax.xml.namespace.QName, javax.jbi.messaging.NormalizedMessage)
142: */
143: @Override
144: protected void processInMessage(QName service, QName operation,
145: NormalizedMessage in, MessageExchange exchange)
146: throws CbesbException, JBIException {
147:
148: NormalizedMessage out = null;
149: processMessageHelper(service, operation, in, out, false,
150: exchange);
151:
152: }
153:
154: /*
155: * (non-Javadoc)
156: *
157: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#processInOutMessage(javax.xml.namespace.QName,
158: * javax.xml.namespace.QName, javax.jbi.messaging.NormalizedMessage,
159: * javax.jbi.messaging.NormalizedMessage, boolean)
160: */
161: @Override
162: protected boolean processInOutMessage(QName service,
163: QName operation, NormalizedMessage in,
164: NormalizedMessage out, boolean optionalOut,
165: MessageExchange exchange) throws CbesbException,
166: JBIException {
167:
168: return processMessageHelper(service, operation, in, out,
169: optionalOut, exchange);
170:
171: }
172:
173: /*
174: * (non-Javadoc)
175: *
176: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#processInOutMessage(javax.xml.namespace.QName,
177: * javax.xml.namespace.QName, javax.jbi.messaging.NormalizedMessage,
178: * javax.jbi.messaging.NormalizedMessage, boolean)
179: */
180:
181: private boolean processMessageHelper(QName service,
182: QName operation, NormalizedMessage in,
183: NormalizedMessage out, boolean optionalOut,
184: MessageExchange exchange) throws CbesbException,
185: JBIException {
186:
187: NormalizedMessageHandler nmhIn = new NormalizedMessageHandler(
188: in);
189: if (nmhIn.getRecordCount() == 0) {
190: logger.warn("CBR - Message contains no records");
191: return false;
192: }
193:
194: // NormalizedMessageHandler nmhOut=null;
195:
196: // if (exchange instanceof InOut)
197: // nmhOut = new NormalizedMessageHandler(out, getProviderSvcDescHandlerInstance());
198:
199: Vector<String> replyEndpoints = new Vector<String>();
200:
201: //Get the first record from the DataEnvelope
202: Source srcRecord = nmhIn.getRecordAtIndex(0);
203: //Get the trxId value from the record
204: String trxId = processTrxId(srcRecord, exchange);
205:
206: // Process RoutingRules
207: loadRoutingRules();
208: int routingRulesCount = routingRuleHandler.size();
209: boolean writeErrorDB = true;
210:
211: for (int i = 0; i < routingRulesCount; i++) {
212: RoutingRuleElement routingRule = routingRuleHandler
213: .getRoutingRuleAtIndex(i);
214: String expression = routingRule.getExpression();
215: String expressionType = routingRule.getExpressionType();
216:
217: if (logger.isDebugEnabled()) {
218: logger.debug("CBR Routing Rule: [" + expression
219: + "] Type: [" + expressionType + "]");
220: logger.debug("CBR TrxId = [" + trxId + "]");
221: }
222:
223: boolean result = true;
224: if (expression != null) {
225: if ("Exact".equals(expressionType)) {
226: result = ExpressionHandler.processExactExpression(
227: trxId, expression);
228: } else if ("RegExp".equals(expressionType)) {
229: result = ExpressionHandler.processRegexpExpression(
230: trxId, expression);
231: } else if ("XPath".equals(expressionType)) {
232: DOMSource domSrc = convertToDomSource(srcRecord);
233: result = ExpressionHandler.processXpathExpression(
234: domSrc, expression);
235: } else {
236: ConfigurationException ce = new ConfigurationException(
237: "CBR - Invalid expression type defined in route - "
238: + expressionType);
239: ce.setRemedy("Fix the configuration setting.");
240: throw ce;
241: }
242:
243: }
244:
245: if (logger.isDebugEnabled())
246: logger.debug("CBR expression result = " + result);
247: if (result) {
248:
249: writeErrorDB = false;
250: int targetsCount = routingRule.size();
251: for (int j = 0; j < targetsCount; j++) {
252: TargetElement target = routingRule
253: .getTargetServiceAtIndex(j);
254:
255: MessageExchange newExchange = null;
256:
257: if (exchange instanceof InOut)
258: newExchange = messageExchangeFactory
259: .createInOutExchange();
260: else if (exchange instanceof RobustInOnly)
261: newExchange = messageExchangeFactory
262: .createRobustInOnlyExchange();
263: else
264: newExchange = messageExchangeFactory
265: .createInOnlyExchange();
266:
267: this .configureTarget(newExchange, target);
268: transferExchangeProperties(exchange, newExchange);
269:
270: NormalizedMessage newInMsg = newExchange
271: .createMessage();
272: ServiceDescriptionHandler svcDescHandler = getSvcDescHandlerForTarget(target);
273: transferMsgContents(in, newInMsg, svcDescHandler);
274:
275: newExchange.setMessage(newInMsg, "in");
276:
277: // Send to corresponding target
278: if (target.getTimeout() < 0)
279: channel.sendSync(newExchange);
280: else
281: channel.sendSync(newExchange, target
282: .getTimeout());
283:
284: //Copy the metadata back into the original exchange
285: transferExchangeProperties(newExchange, exchange);
286:
287: if (newExchange.getError() != null) {
288:
289: // some bad happen, stop the flow.
290: // we need to stop the CBR earlier
291: exchange.setError(newExchange.getError());
292: return false;
293: }
294:
295: if (newExchange instanceof InOut
296: || newExchange instanceof RobustInOnly) {
297:
298: if (newExchange.getFault() != null) {
299: // get a fault; handle it
300: exchange.setFault(newExchange.getFault());
301:
302: /*
303: * should release the newExchange by setting the DONE status.
304: */
305: newExchange.setStatus(ExchangeStatus.DONE);
306: channel.send(newExchange);
307:
308: // we need to stop the CBR earlier
309: return false;
310: } else {
311: if (newExchange instanceof InOut) {
312: //Process the reply
313: NormalizedMessage newOutMsg = ((InOut) newExchange)
314: .getOutMessage();
315: transferMsgContents(
316: newOutMsg,
317: out,
318: getProviderSvcDescHandlerInstance());
319: //Add an entry to the replyEndpoints vector
320: //So an property can be set in the exchange
321: replyEndpoints.add(newExchange
322: .getEndpoint()
323: .getEndpointName());
324:
325: /**
326: * should release the newExchange by setting the DONE status.
327: */
328: if (logger.isDebugEnabled())
329: logger
330: .debug("New Exchange Status : "
331: + newExchange
332: .getStatus()
333: .toString());
334:
335: newExchange
336: .setStatus(ExchangeStatus.DONE);
337: channel.send(newExchange);
338: }
339: }
340: }
341: }
342: }
343: }
344:
345: if (writeErrorDB) {
346: logger
347: .info("The MessageExchange with trxID '"
348: + trxId
349: + "'does not match any expression in CBR. Save the exchange into the error database.");
350: ErrorDb
351: .write(
352: new Exception(
353: "The MessageExchange with trxID '"
354: + trxId
355: + "' does not match any expression in CBR."),
356: exchange, logger);
357: }
358:
359: if (exchange instanceof InOut) {
360:
361: //Create an array containing the endpoint name where each reply
362: //record came from.
363: String[] replyEndpointsProperty = new String[replyEndpoints
364: .size()];
365: for (int i = 0; i < replyEndpoints.size(); i++) {
366: replyEndpointsProperty[i] = replyEndpoints.elementAt(i);
367: }
368: exchange.setProperty(PROPERTY_CBR_REPLY_RECORD_ENDPOINTS,
369: replyEndpointsProperty);
370:
371: return true;
372: } else
373: return false;
374: }
375:
376: /**
377: * Loads the service list from a file.
378: *
379: * @throws Exception
380: */
381: private void loadRoutingRules() throws CbesbException {
382: File routingRulesFile = new File(getEndpoint().getServiceUnit()
383: .getRootPath(), ((CBREndpoint) getEndpoint())
384: .getRoutingRules());
385: logger.debug(" getRoutingRules: "
386: + getEndpoint().getServiceUnit().getRootPath() + " "
387: + ((CBREndpoint) getEndpoint()).getRoutingRules()
388: + "\n");
389: routingRuleHandler = RoutingRulesHandler.load(routingRulesFile);
390: }
391:
392: private String processTrxId(Source srcRecord,
393: MessageExchange exchange) throws CbesbException {
394: CBREndpoint endpoint = (CBREndpoint) getEndpoint();
395: String type = endpoint.getType();
396: String result = null;
397: TrxIDHandler trxIdHandler = TrxIDHandler.instance();
398:
399: if ("fixed".equalsIgnoreCase(type)) {
400: StringSource strSrc = StringSource
401: .convertToStringSource(srcRecord);
402: if (strSrc != null) {
403: String data = strSrc.getText();
404: int offset;
405: int length;
406: try {
407: offset = Integer.parseInt(endpoint.getOffset());
408: length = Integer.parseInt(endpoint.getLength());
409: } catch (NumberFormatException e) {
410: ConfigurationException ce = new ConfigurationException(
411: "Fixed TrxId type does not have valid settings for Offset and Length - "
412: + e.getMessage(), e);
413: ce.setRemedy("Correct the configuration settings");
414: throw ce;
415: }
416: try {
417: result = trxIdHandler.processFixed(data, offset,
418: length);
419: } catch (Exception e) {
420: DataContentException dce = new DataContentException(
421: "Error occurred while processing Fixed TrxId - "
422: + e.getMessage(), e);
423: dce
424: .setRemedy("Verify that the message content is valid.");
425: throw dce;
426: }
427: }
428:
429: } else if ("csv".equalsIgnoreCase(type)) {
430: StringSource strSrc = StringSource
431: .convertToStringSource(srcRecord);
432: if (strSrc != null) {
433: String data = strSrc.getText();
434: String delimiter = endpoint.getDelimiter();
435: if (delimiter == null || delimiter.length() == 0) {
436: ConfigurationException ce = new ConfigurationException(
437: "CSV TrxId type does not have valid setting for Delimiter.");
438: ce.setRemedy("Correct the configuration settings");
439: throw ce;
440: }
441: int index;
442: try {
443: index = Integer.parseInt(endpoint.getIndex());
444: } catch (NumberFormatException e) {
445: ConfigurationException ce = new ConfigurationException(
446: "CSV TrxId type does not have valid setting for Index - "
447: + e.getMessage(), e);
448: ce.setRemedy("Correct the configuration settings");
449: throw ce;
450: }
451: try {
452: result = trxIdHandler.processCSV(data, delimiter,
453: index);
454: } catch (Exception e) {
455: DataContentException dce = new DataContentException(
456: "Error occurred while processing CSV TrxId - "
457: + e.getMessage(), e);
458: dce
459: .setRemedy("Verify that the message content is valid.");
460: throw dce;
461: }
462: }
463:
464: } else if ("x12".equalsIgnoreCase(type)) {
465: StringSource strSrc = StringSource
466: .convertToStringSource(srcRecord);
467: if (strSrc != null) {
468: String data = strSrc.getText();
469: try {
470: result = trxIdHandler.processX12(data);
471: } catch (Exception e) {
472: DataContentException dce = new DataContentException(
473: "Error occurred while processing X12 TrxId - "
474: + e.getMessage(), e);
475: dce
476: .setRemedy("Verify that the message content is valid.");
477: throw dce;
478: }
479: }
480:
481: } else if ("hl7".equalsIgnoreCase(type)) {
482: StringSource strSrc = StringSource
483: .convertToStringSource(srcRecord);
484: if (strSrc != null) {
485: String data = strSrc.getText();
486: try {
487: result = trxIdHandler.processHL7(data);
488: } catch (Exception e) {
489: DataContentException dce = new DataContentException(
490: "Error occurred while processing HL7 TrxId - "
491: + e.getMessage(), e);
492: dce
493: .setRemedy("Verify that the message content is valid.");
494: throw dce;
495: }
496: }
497:
498: } else if ("xpath".equalsIgnoreCase(type)) {
499: //There are 2 modes for xpath. If there is no expression
500: //defined in the endpoint, then don't evaluate a trxId.
501: //The xpath expression will be evaluated on each route rule
502: //instead.
503:
504: StringSource strSrc = StringSource
505: .convertToStringSource(srcRecord);
506: if (strSrc != null) {
507: String expression = endpoint.getExpression();
508: try {
509: result = trxIdHandler.processXPATH(expression,
510: strSrc.getText());
511: } catch (Exception e) {
512: DataContentException dce = new DataContentException(
513: "Error occurred while processing XPath TrxId - "
514: + e.getMessage(), e);
515: dce
516: .setRemedy("Verify that the message content and expression are valid.");
517: throw dce;
518: }
519: }
520:
521: //This code would be more efficient than converting back and forth
522: //between DOM->String->DOM, but an exception is occurring
523: //that I am not sure of the cause.
524: // DOMSource domSrc = convertToDomSource(srcRecord);
525: // if (domSrc != null)
526: // {
527: // String expression = endpoint.getExpression();
528: // result = trxIdHandler.processXPATH(expression, domSrc);
529: // }
530:
531: } else if ("script".equalsIgnoreCase(type)) {
532: try {
533: String scriptEngine = endpoint.getScriptEngine();
534: String scriptClass = endpoint.getScriptClass();
535: // we have the naming convention to use "process" as the method name
536: // String scriptMethod = endpoint.getScriptMethod();
537: String suRootPath = getEndpoint().getServiceUnit()
538: .getRootPath();
539: result = trxIdHandler.processScript(exchange,
540: scriptEngine, scriptClass, "process",
541: suRootPath);
542: } catch (Exception e) {
543: DataContentException dce = new DataContentException(
544: "Error occurred while processing Script TrxId - "
545: + e.getMessage(), e);
546: dce
547: .setRemedy("Check the message content is valid and the script does not contain errors.");
548: throw dce;
549: }
550: } else {
551: ConfigurationException ce = new ConfigurationException(
552: "Invalid TrxId type: " + type,
553: CbesbException.SUBMIT_BUG);
554: ce.setRemedy("Fix the configuration setting.");
555: throw ce;
556: }
557:
558: return result;
559: }
560:
561: protected DOMSource convertToDomSource(Source source) {
562: DOMSource domSrc = null;
563: if (source instanceof DOMSource) {
564: //No conversion necessary
565: domSrc = (DOMSource) source;
566: } else {
567: //Try converting the source into a DOMSource
568: try {
569: DOMResult dr = new DOMResult();
570: TransformerFactory tf = TransformerFactory
571: .newInstance();
572: Transformer t = tf.newTransformer();
573: t.transform(source, dr);
574: domSrc = new DOMSource(dr.getNode());
575: } catch (Exception e) {
576: ErrorUtil.printError(
577: "CBR - Cannot convert data into a DOMSource:",
578: e);
579: }
580:
581: }
582: return domSrc;
583: }
584:
585: public void configureTarget(MessageExchange exchange,
586: TargetElement target) throws MessagingException {
587: if (target.getInterface() == null
588: && target.getService() == null) {
589: throw new MessagingException(
590: "interface or service should be specified");
591: }
592: if (target.getInterface() != null) {
593: exchange.setInterfaceName(target.getInterface());
594: }
595: if (target.getService() != null) {
596: exchange.setService(target.getService());
597: }
598: if (target.getOperation() != null) {
599: exchange.setOperation(target.getOperation());
600: }
601: }
602:
603: protected ServiceDescriptionHandler getSvcDescHandlerForTarget(
604: TargetElement target) {
605: try {
606: ComponentContext context = endpoint.getServiceUnit()
607: .getComponent().getComponentContext();
608: ServiceEndpoint[] endpoints = context
609: .getEndpointsForService(target.getService());
610: if (endpoints.length > 0) {
611: Document wsdlDoc = context
612: .getEndpointDescriptor(endpoints[0]);
613: return ServiceDescriptionHandler.getInstance(wsdlDoc);
614: }
615: } catch (Exception e) {
616: logger
617: .error(
618: "Exception while retrieving target service description:",
619: e);
620: }
621: return null;
622: }
623:
624: private void transferMsgContents(NormalizedMessage source,
625: NormalizedMessage dest,
626: ServiceDescriptionHandler svcDescHandler)
627: throws CbesbException, MessagingException {
628: NormalizedMessageHandler nmhSource = new NormalizedMessageHandler(
629: source);
630: nmhSource.copy(dest, svcDescHandler);
631:
632: for (Iterator it = source.getPropertyNames().iterator(); it
633: .hasNext();) {
634: String name = (String) it.next();
635: dest.setProperty(name, source.getProperty(name));
636: }
637:
638: dest.setSecuritySubject(source.getSecuritySubject());
639: }
640:
641: private void transferExchangeProperties(
642: MessageExchange srcExchange, MessageExchange targetExchange) {
643: for (Iterator it = srcExchange.getPropertyNames().iterator(); it
644: .hasNext();) {
645: String name = (String) it.next();
646: targetExchange.setProperty(name, srcExchange
647: .getProperty(name));
648: }
649: }
650:
651: }
|