001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: ScheduledProcessHandler.java 12067 2008-02-22 15:07:09Z mpreston $
023: */
024: package com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging;
025:
026: import java.util.Vector;
027:
028: import javax.jbi.JBIException;
029: import javax.jbi.component.ComponentContext;
030: import javax.jbi.messaging.ExchangeStatus;
031: import javax.jbi.messaging.InOnly;
032: import javax.jbi.messaging.InOut;
033: import javax.jbi.messaging.MessageExchange;
034: import javax.jbi.messaging.MessagingException;
035: import javax.jbi.messaging.NormalizedMessage;
036: import javax.jbi.messaging.RobustInOnly;
037: import javax.jbi.servicedesc.ServiceEndpoint;
038:
039: import org.apache.commons.logging.Log;
040: import org.apache.commons.logging.LogFactory;
041:
042: import com.bostechcorp.cbesb.common.constant.MetadataConstants;
043: import com.bostechcorp.cbesb.common.runtime.CbesbException;
044: import com.bostechcorp.cbesb.common.util.ErrorUtil;
045: import com.bostechcorp.cbesb.runtime.ccsl.lib.ErrorDb;
046: import com.bostechcorp.cbesb.runtime.ccsl.lib.ExternalInput;
047: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.FaultHandler;
048: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.SourceHelper;
049: import com.bostechcorp.cbesb.runtime.component.util.wsdl.WsdlMepConstants;
050:
051: /**
052: * This is the base class that should be extended by concrete Binding Components
053: * that support polling or schedules. This acts as the consumer rather than
054: * implementing a concrete ConsumerProvider class.
055: */
056: public abstract class ScheduledProcessHandler {
057:
058: protected final transient Log logger = LogFactory
059: .getLog(getClass());
060:
061: protected ScheduledEndpointProcessor endpoint;
062:
063: protected ServiceDescriptionHandler serviceDescriptionHandler;
064:
065: protected String lastError;
066:
067: protected long startProcessTime;
068:
069: public ScheduledProcessHandler(ScheduledEndpointProcessor endpoint) {
070: this .endpoint = endpoint;
071: }
072:
073: /**
074: * @return the lastError
075: */
076: public String getLastError() {
077: return lastError;
078: }
079:
080: /**
081: * @param lastError
082: * the lastError to set
083: */
084: public void setLastError(String lastError) {
085: this .lastError = lastError;
086: }
087:
088: protected void doStart() throws Exception {
089: }
090:
091: protected void doStop() throws Exception {
092: }
093:
094: public void processRobustInOnly(MessageExchange exchange)
095: throws JBIException {
096: throw new JBIException(
097: getLocalErrorMsg()
098: + "receives an unexpected MessageExchange in Consumer processor :"
099: + exchange.toString());
100:
101: }
102:
103: public void processInOnly(MessageExchange exchange)
104: throws JBIException {
105: throw new JBIException(
106: getLocalErrorMsg()
107: + "receives an unexpected MessageExchange in Consumer processor :"
108: + exchange.toString());
109: }
110:
111: public void processInOut(MessageExchange exchange, boolean optional)
112: throws JBIException {
113: throw new JBIException(
114: getLocalErrorMsg()
115: + "receive an unexpected MessageExchange in Consumer processor :"
116: + exchange.toString());
117: }
118:
119: protected boolean doTriggerProc() throws Exception {
120: this .startProcessTime = System.currentTimeMillis();
121: return doTrigger();
122: }
123:
124: protected abstract boolean doTrigger() throws Exception;
125:
126: protected void doProcessFault(NormalizedMessage nm, String fault)
127: throws Exception {
128:
129: FaultHandler fh = new FaultHandler(fault);
130:
131: logger.error(getLocalErrorMsg() + " reports error: "
132: + fh.getMessage());
133: if (fh.getEndpointString() != null)
134: logger.info("The error occured at endpoint: "
135: + fh.getEndpointString());
136:
137: if (fh.getRemedy() != null) {
138: logger.info("Remedy: " + fh.getRemedy());
139: }
140:
141: if (logger.isDebugEnabled() && fh.getDetail() != null) {
142: logger.debug("Error details: " + fh.getDetail());
143: }
144: }
145:
146: protected abstract void doProcessOut(NormalizedMessage nm,
147: String s, MessageExchange me) throws Exception;
148:
149: private String getLocalErrorMsg() {
150: return "Consumer endpoint '" + endpoint.getEndpoint() + "' ";
151: }
152:
153: /**
154: * The process() method is called from the child consumer processor when it
155: * read data from external connection and turn it into an MessageExchange
156: * and route to NMR.
157: */
158: public void process(Object data, IConsumerHandlerContext chContext) {
159: // logger.debug("Endpoint's DefaultMEP :" + endpoint.getDefaultMep());
160: // if (endpoint.getDefaultMep().equals(WsdlMepConstants.IN_ONLY)) {
161: // handleInOnly(data);
162: // } else if (endpoint.getDefaultMep().equals(WsdlMepConstants.IN_OUT))
163: // {
164: // handleInOut(data);
165: // } else if
166: // (endpoint.getDefaultMep().equals(WsdlMepConstants.ROBUST_IN_ONLY)) {
167: // handleRobustInOnly(data);
168: // } else
169: // throw new JBIException("trying to process unknown MEP
170: // \""+endpoint.getDefaultMep()+"\"");
171:
172: if (logger.isDebugEnabled())
173: logger.debug("Endpoint's DefaultMEP :"
174: + endpoint.getDefaultMep());
175:
176: MessageExchange me = null;
177: try {
178: if (endpoint.getDefaultMep().equals(
179: WsdlMepConstants.IN_ONLY)) {
180: handleInOnly(data, me, chContext);
181: } else if (endpoint.getDefaultMep().equals(
182: WsdlMepConstants.IN_OUT)) {
183: handleInOut(data, me, chContext);
184: } else if (endpoint.getDefaultMep().equals(
185: WsdlMepConstants.ROBUST_IN_ONLY)) {
186: handleRobustInOnly(data, me, chContext);
187: } else {
188:
189: logger.warn("trying to process unknown MEP \""
190: + endpoint.getDefaultMep() + "\"");
191: }
192: this .endpoint.sendMessageProcessedNotification(System
193: .currentTimeMillis()
194: - this .startProcessTime);
195:
196: } catch (JBIException e) {
197: ErrorUtil.printError(getLocalErrorMsg()
198: + " encountered with JBI errors: ", e);
199: ErrorDb.write(e, me, logger);
200:
201: }
202: // catch (CbesbException e2) {
203: // ErrorUtil.printWarn("Consumer endpoint '" + endpoint.getEndpoint() +
204: // "' encountered with errors: ", e2);
205: // }
206:
207: }
208:
209: /**
210: * @deprecated
211: *
212: * The process() method is called from the child consumer processor when it
213: * read data from external connection and turn it into an MessageExchange
214: * and route to NMR.
215: *
216: */
217: public void process(Object data) {
218: process(data, null);
219:
220: }
221:
222: /**
223: *
224: * The transform() method is to transform the data read from connection into
225: * NormalizedMessage in the MessageExchange. This implementation expects
226: * "data" to be an instance of ExternalInput. Subclasses can override this
227: * if they have other requirements. Subclasses can also override this if
228: * they need to copy metadata values into the exchange. Most classes should
229: * still be able to use super.transform() to do the actual data copy.
230: *
231: * @param data
232: * @param me
233: * @throws Exception
234: */
235: protected void transform(Object data, MessageExchange me,
236: IConsumerHandlerContext context) throws Exception {
237: ExternalInput ext = (ExternalInput) data;
238: NormalizedMessage msg = me.createMessage();
239: ext.populateMessage(msg, getSvcDescHandlerInstance());
240: me.setMessage(msg, "in");
241: }
242:
243: protected ServiceDescriptionHandler getSvcDescHandlerInstance() {
244: if (serviceDescriptionHandler == null) {
245: logger
246: .debug("Attempting to retreive Service Unit Descriptor");
247: ServiceUnitDescriptor suDescriptor = endpoint
248: .getServiceUnit().getServiceUnitDescriptor();
249: if (suDescriptor != null) {
250: Vector<ServiceInfo> consumes = suDescriptor
251: .getConsumes();
252: if (consumes.size() > 0) {
253: ServiceInfo svcInfo = consumes.elementAt(0);
254: logger.debug("Target Service Info: "
255: + svcInfo.toString());
256: ComponentContext context = endpoint
257: .getServiceUnit().getComponent()
258: .getComponentContext();
259: serviceDescriptionHandler = ServiceDescriptionHandler
260: .getInstance(svcInfo, context);
261: }
262: }
263: }
264: return serviceDescriptionHandler;
265: }
266:
267: protected boolean handleInOnly(Object data, MessageExchange me,
268: IConsumerHandlerContext chContext) throws JBIException {
269: boolean sended = false;
270:
271: ComponentContext context = endpoint.getServiceUnit()
272: .getComponent().getComponentContext();
273: me = endpoint.getChannel().createExchangeFactory()
274: .createInOnlyExchange();
275: String endpointKey = "{"
276: + endpoint.getService().getNamespaceURI() + "}"
277: + endpoint.getService().getLocalPart() + ":"
278: + endpoint.getEndpoint();
279: me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
280: endpointKey);
281: try {
282: transform(data, me, chContext);
283: } catch (Exception e) {
284: // just report an warning. not much you can do
285: ErrorUtil
286: .printError(
287: "Consumer endpoint '"
288: + endpoint.getEndpoint()
289: + "' failed to populate the InOnly MessageExchange ",
290: e);
291: return false;
292: }
293:
294: logger.debug("Consumer endpoint service="
295: + endpoint.getService() + " endpoint="
296: + endpoint.getEndpoint());
297: ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
298: .getService(), endpoint.getEndpoint());
299: logger.debug("Got target endpoint " + linkedEndpoint
300: + " service=" + linkedEndpoint.getServiceName()
301: + " endpoint=" + linkedEndpoint.getEndpointName());
302:
303: me.setEndpoint(linkedEndpoint);
304: me.setService(endpoint.getService());
305:
306: // TODO LU : If we change to use send(); the ME will be received from
307: // NMR
308: // we can use the processInonly() method to continue the flow.
309: endpoint.getChannel().sendSync(me);
310: if (ExchangeStatus.DONE.equals(me.getStatus())) {
311: sended = true;
312: }
313: return sended;
314: }
315:
316: protected void handleInOut(Object data, MessageExchange me,
317: IConsumerHandlerContext chContext) throws JBIException {
318:
319: // This was missing from in-out causing a null destination
320: ComponentContext context = endpoint.getServiceUnit()
321: .getComponent().getComponentContext();
322: me = endpoint.getChannel().createExchangeFactory()
323: .createInOutExchange();
324: String endpointKey = "{"
325: + endpoint.getService().getNamespaceURI() + "}"
326: + endpoint.getService().getLocalPart() + ":"
327: + endpoint.getEndpoint();
328: me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
329: endpointKey);
330:
331: logger.debug("Consumer endpoint service="
332: + endpoint.getService() + " endpoint="
333: + endpoint.getEndpoint());
334: ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
335: .getService(), endpoint.getEndpoint());
336: logger.debug("Got target endpoint " + linkedEndpoint
337: + " service=" + linkedEndpoint.getServiceName()
338: + " endpoint=" + linkedEndpoint.getEndpointName());
339:
340: me.setEndpoint(linkedEndpoint);
341: me.setService(endpoint.getService());
342:
343: try {
344: transform(data, me, chContext);
345: } catch (Exception e) {
346: // just report an warning. not much you can do
347: ErrorUtil.printError(getLocalErrorMsg()
348: + " failed to populate the InOut exchange ", e);
349: return;
350: }
351:
352: // TODO LU : If we change to use send(); the ME will be received from
353: // NMR
354: // we can use the processInOut() method to continue the flow.
355:
356: endpoint.getChannel().sendSync(me);
357:
358: try {
359: // Maybe it had an error in called service
360: if ((ExchangeStatus.ERROR).equals(me.getStatus())
361: || me.getFault() != null) {
362: handleErrorFault(me, data);
363: } else {
364: NormalizedMessage nm = me.getMessage("out");
365: if (nm != null)
366: doProcessOut(nm, me);
367: else {
368:
369: logger
370: .error(getLocalErrorMsg()
371: + "does get the out message returned for InOut exchange");
372: ErrorDb
373: .write(
374: new Exception(
375: getLocalErrorMsg()
376: + "does get the out message returned for InOut exchange"),
377: me, logger);
378: }
379: me.setStatus(ExchangeStatus.DONE);
380: endpoint.getChannel().send(me);
381: }
382:
383: } catch (CbesbException ex) {
384: ErrorUtil.printError(getLocalErrorMsg()
385: + "fails to process the out message. ", ex);
386: ErrorDb.write(ex, me, logger);
387: // throw new JBIException(ex.getMessage());
388: }
389: }
390:
391: protected boolean handleRobustInOnly(Object objMsg,
392: MessageExchange me, IConsumerHandlerContext chContext)
393: throws JBIException {
394: boolean sended = false;
395:
396: ComponentContext context = endpoint.getServiceUnit()
397: .getComponent().getComponentContext();
398:
399: me = endpoint.getChannel().createExchangeFactory()
400: .createRobustInOnlyExchange();
401: String endpointKey = "{"
402: + endpoint.getService().getNamespaceURI() + "}"
403: + endpoint.getService().getLocalPart() + ":"
404: + endpoint.getEndpoint();
405: me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
406: endpointKey);
407:
408: logger.debug("Consumer endpoint service="
409: + endpoint.getService() + " endpoint="
410: + endpoint.getEndpoint());
411: ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
412: .getService(), endpoint.getEndpoint());
413: logger.debug("Got target endpoint " + linkedEndpoint
414: + " service=" + linkedEndpoint.getServiceName()
415: + " endpoint=" + linkedEndpoint.getEndpointName());
416:
417: me.setEndpoint(linkedEndpoint);
418: me.setService(endpoint.getService());
419:
420: try {
421: transform(objMsg, me, chContext);
422: } catch (Exception e) {
423: ErrorUtil.printError(getLocalErrorMsg()
424: + " failed to populate the RobustInOnly exchange ",
425: e);
426: return false;
427: }
428: endpoint.getChannel().sendSync(me);
429:
430: ExchangeStatus status = me.getStatus();
431:
432: if (status.equals(ExchangeStatus.ERROR)
433: || me.getFault() != null) {
434: // Notify error to external service
435: handleErrorFault(me, objMsg);
436: sended = true;
437: } else if (status.equals(ExchangeStatus.DONE)) {
438: sended = true;
439: }
440: return sended;
441:
442: }
443:
444: /**
445: * Get fault message and do something with it; The child class need to
446: * overwrite this one to provide more action such as write into a File, put
447: * into databasee and etc al.
448: *
449: * @param me
450: * @param data
451: */
452: protected void handleErrorFault(MessageExchange me, Object data)
453: throws JBIException {
454:
455: NormalizedMessage nm = me.getFault();
456:
457: if (nm != null) {
458: try {
459: doProcessFault(nm);
460: } catch (CbesbException e) {
461: throw new JBIException(
462: "Exception in processing fault: ", e);
463: }
464:
465: ErrorDb.write(new Exception("fault:" + nm.toString()), me,
466: logger);
467:
468: me.setStatus(ExchangeStatus.DONE);
469: endpoint.getChannel().send(me);
470:
471: } else {
472: // Must be an error
473: logger.error(me.getError().getMessage());
474: if (logger.isDebugEnabled())
475: logger.debug(me.getError());
476:
477: ErrorDb.write(me.getError(), me, logger);
478:
479: throw new JBIException("General Exception.", me.getError());
480: }
481:
482: }
483:
484: protected void doProcessFault(NormalizedMessage nm)
485: throws JBIException, CbesbException {
486: String fault = null;
487: if (nm != null) {
488: fault = SourceHelper.createString(nm.getContent());
489: }
490: try {
491: doProcessFault(nm, fault);
492: } catch (Exception e) {
493: throw CbesbException.create(e);
494: }
495: }
496:
497: protected void doProcessOut(NormalizedMessage nm, MessageExchange me)
498: throws JBIException, CbesbException {
499: String response = null;
500: if (nm != null) {
501: response = SourceHelper.createString(nm.getContent());
502: }
503: try {
504: doProcessOut(nm, response, me);
505: } catch (Exception e) {
506: throw CbesbException.create(e);
507: }
508:
509: }
510:
511: }
|