001:/*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: ConsumerProcessor.java 1206 2006-09-23 03:51:32Z elu $
023: */
024:package com.bostechcorp.cbesb.runtime.component.jms.processors;
025:
026:import java.io.ByteArrayInputStream;
027:import java.io.ByteArrayOutputStream;
028:import java.io.IOException;
029:import java.io.StringReader;
030:import java.util.Enumeration;
031:import java.util.Iterator;
032:import java.util.concurrent.atomic.AtomicBoolean;
033:import java.util.concurrent.atomic.AtomicInteger;
034:
035:import javax.jbi.JBIException;
036:import javax.jbi.messaging.MessageExchange;
037:import javax.jbi.messaging.NormalizedMessage;
038:import javax.jms.BytesMessage;
039:import javax.jms.JMSException;
040:import javax.jms.Message;
041:import javax.jms.TextMessage;
042:import javax.resource.spi.work.Work;
043:import javax.resource.spi.work.WorkException;
044:import javax.xml.transform.TransformerConfigurationException;
045:import javax.xml.transform.TransformerException;
046:
047:import org.apache.commons.logging.Log;
048:import org.apache.commons.logging.LogFactory;
049:
050:import com.bostechcorp.cbesb.common.runtime.CbesbException;
051:import com.bostechcorp.cbesb.common.runtime.ConfigurationException;
052:import com.bostechcorp.cbesb.common.runtime.ResourcesConnectionException;
053:import com.bostechcorp.cbesb.common.util.ErrorUtil;
054:import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbConsumerProcessor;
055:import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.StatusConstants;
056:import com.bostechcorp.cbesb.runtime.ccsl.lib.ExternalInput;
057:import com.bostechcorp.cbesb.runtime.ccsl.messages.FaultMessage.FaultMessageTypes;
058:import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.FaultHandler;
059:import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.SourceHelper;
060:import com.bostechcorp.cbesb.runtime.component.jms.JMSEndpoint;
061:import com.bostechcorp.cbesb.runtime.component.jms.JMSMarshaler;
062:import com.bostechcorp.cbesb.runtime.component.util.wsdl.WsdlMepConstants;
063:import com.bostechcorp.cbesb.runtime.jms.JMSConsumerHandler;
064:import com.bostechcorp.cbesb.runtime.jms.JMSHandler;
065:
066:public class ConsumerProcessor extends CbConsumerProcessor {
067:
068: protected final transient Log logger = LogFactory.getLog(getClass());
069:
070: protected AtomicBoolean running = new AtomicBoolean(false);
071:
072:
073: //TODO Eric Lu : please pay attention to the thread-safety issue here for using any attributes
074: private JMSHandler jmsHandler;
075:
076: private JMSEndpoint endpoint;
077:
078: // private MessageExchange exchange;
079:
080: // added in JMS 1.2
081: Message jmsInMessage = null;
082:
083: private FaultMessageTypes faultType = null;
084:
085: //Important: rename retrysLeft -> retrysCount, we want it compare to endpoint.getMaxRetryCount() directly,
086: //So we can support use change MaxRetryCount property of endpoint dynamically by Admin Console
087: protected AtomicInteger retrysCount = new AtomicInteger(0);
088:
089:// protected long retryInterval = 0;
090:
091:
092:
093: private Work pollWork = null;
094:
095: // since version 1.2b
096: private boolean recoverableInRetryFault=false;
097:
098: // private boolean needsReply;
099: //
100:
101: public ConsumerProcessor(JMSEndpoint endpoint) {
102: super (endpoint);
103: this .endpoint = endpoint;
104: }
105:
106: protected void doStart() throws ConfigurationException,
107: ResourcesConnectionException, JMSException, WorkException,
108: InterruptedException {
109:
110: // try {
111: String initialContextFactory = null;
112: String providerUrl = null;
113: String connectionFactoryName = null;
114: String userName = null;
115: String password = null;
116:
117: if (endpoint.getJndiInitialContextFactory() != null)
118: initialContextFactory = endpoint.getJndiInitialContextFactory();
119:
120: if (endpoint.getJndiProviderUrl() != null)
121: providerUrl = endpoint.getJndiProviderUrl();
122:
123: if (endpoint.getJndiConnectionFactoryName() != null)
124: connectionFactoryName = endpoint.getJndiConnectionFactoryName();
125:
126: logger.debug("defaultMep:"+endpoint.getDefaultMep());
127:
128: if (!endpoint.getDefaultMep().equals(WsdlMepConstants.IN_ONLY)) {
129: // creating a tranactional/nonTransactional handler
130: logger.info("Creating transactional JMS Handler");
131: jmsHandler = new JMSConsumerHandler(initialContextFactory, providerUrl,
132: connectionFactoryName, userName, password, endpoint
133: .getDestinationStyle(), endpoint
134: .getTargetDestinationName(), endpoint
135: .getReplyDestinationName(), endpoint
136: .isTransactional(),endpoint.getDLQ());
137: // jmsHandler.setExceptionListener(this);
138: //retry relate to transaction.
139: if (endpoint.isTransactional() && endpoint.isRetry()) // getting retry settings
140: {
141:// synchronized(retrysCount)
142:// {retrysCount.set(endpoint.getMaxRetryCount());
143:// retrysCount.notify();
144:// }
145: //retryInterval = endpoint.getRetryInterval();
146: logger.debug("Retry mode enabled. "+endpoint.getMaxRetryCount()+" retry's count with interval of "+endpoint.getRetryInterval()+" ms");
147: }else
148: {
149: logger.debug("Retry mode Disabled.");
150: }
151:
152: } else// Inonly Mep: non transactional handler
153: {
154: //Transaction not supported for InOnly MEP
155: logger.info("Creating nontransactional JMS Handler");
156: jmsHandler = new JMSConsumerHandler(initialContextFactory, providerUrl,
157: connectionFactoryName, userName, password, endpoint
158: .getDestinationStyle(), endpoint
159: .getTargetDestinationName(), endpoint
160: .getReplyDestinationName(), false,endpoint.getDLQ());
161: }
162:
163: synchronized (running) {
164: synchronized(retrysCount){
165: pollWork = new Work() {
166: public void release() {}
167: public void run()
168: {ConsumerProcessor.this .poll();}
169: };
170: endpoint.getServiceUnit().getComponent().getWorkManager()
171: .startWork(pollWork);
172: running.wait();
173: retrysCount.wait();
174: }
175: }
176: }
177: protected void doStop(boolean retryStopped) throws Exception {
178: logger.debug("try to stop.........");
179: if(jmsHandler!=null) {
180: logger.debug("try to role back and close jmshandler");
181: jmsHandler.rollback();
182: jmsHandler.close();
183: }
184: logger.debug("try to set running to false");
185: synchronized(retrysCount)
186: {
187: retrysCount.incrementAndGet(); // decreasing retrys left
188: retrysCount.notify();
189: }
190: synchronized(running) {
191: running.set(false);
192: running.notify();
193: }
194: logger.debug("setted running to false");
195: logger.debug("stop done");
196: if(retryStopped)
197: endpoint.setState(StatusConstants.STATUS_RETRY_STOPPRD);
198: else
199: endpoint.setState(StatusConstants.STATUS_DOWN);
200:
201: }
202: /**
203: * transacted pool
204: */
205: protected void poll() {
206: synchronized (running) {
207: running.set(true);
208: running.notify();
209: }
210: try {
211: while (running.get()) {
212: logger.info("Consumer polling message from queue.");
213: jmsInMessage = null;
214: // just in case the selector is used, then we filter the recived
215: // messages based on selector
216: if (endpoint.isEnableQuery()) {
217: jmsInMessage = jmsHandler.receiveFiltered(endpoint
218: .getQueryExpression(), false,true);
219: } else {
220: //jmsInMessage = jmsHandler.receive();
221: jmsInMessage = jmsHandler.receiveFiltered("", false,true);
222: }
223:
224: if (jmsInMessage != null) {
225:
226: debugJmsMessageProperties(jmsInMessage); // Print properties
227:
228: ExternalInput input = jmsMessageToExternalInput();
229:
230: while (input.hasMoreData()) {
231: // exchange = null;
232: long startProcessTime=System.currentTimeMillis();
233: faultType = null;// forget any previous error
234: logger.debug("POLL() BEFORE PROCESS(INPUT)");
235: process(input); // this does a callback to transform()
236: logger.debug("POLL() AFTER PROCESS(INPUT)");
237:
238: if (faultType != null){
239: //Recoverable fault, already roleback in doProcessFault() method
240: if(faultType.equals(FaultMessageTypes.RECOVERABLE)){
241: synchronized (pollWork){
242: pollWork.wait(endpoint.getRetryInterval());
243: }
244: break;
245: }
246: //UnRecoverable falut, already commit in DoProcessFault() method
247: //else continue;
248: }
249: //No fault
250: else {
251: jmsHandler.commit();
252: if(!endpoint.getState().equals(StatusConstants.STATUS_UP))
253: endpoint.setState(StatusConstants.STATUS_UP);
254:
255: }
256: endpoint.sendMessageProcessedNotification(System.currentTimeMillis()-startProcessTime);
257:
258: }
259: }
260: if(jmsHandler.isClosed()) {
261: logger.info("jmsHandler closed");
262:
263: break;
264: }
265: }
266:
267: } catch (Exception e) {
268: ErrorUtil.printError("Exception in poll(): ", e);
269:
270: } finally {
271: synchronized (running) {
272: running.set(false);
273: running.notify();
274: logger.info("Polling Stoped; \"running\" flag set to False");
275: }
276: }
277: }
278:
279: private ExternalInput jmsMessageToExternalInput()
280: throws Exception {
281: String charset = endpoint.getCharset();
282: String readStyle = endpoint.getReadStyle();
283: String recType = endpoint.getRecordType();
284: int rpm = Integer.parseInt(endpoint.getRecordsPerMessage());
285: ExternalInput input = null;
286: if (jmsInMessage instanceof TextMessage) {
287: TextMessage tm = (TextMessage) jmsInMessage;
288: input = new ExternalInput(new StringReader(tm.getText()), charset,
289: readStyle, recType, rpm);
290: } else if (jmsInMessage instanceof BytesMessage) {
291: BytesMessage bm = (BytesMessage) jmsInMessage;
292: ByteArrayOutputStream baos = new ByteArrayOutputStream();
293: byte[] buf = new byte[2048];
294: for (int l = 0; (l = bm.readBytes(buf)) > 0;)
295: baos.write(buf, 0, l);
296: input = new ExternalInput(new ByteArrayInputStream(baos
297: .toByteArray()), charset, readStyle, recType, rpm);
298: } else
299: throw new Exception("unsupported JMS message type " + jmsInMessage);
300: return input;
301: }
302:
303: public void transform(Object data, MessageExchange me) throws Exception {
304: super .transform(data, me);
305: // getting all properties from jmsInMessage and setting them to
306: // NMExchange
307: JMSMarshaler.addMetatataToNMExchange(me, jmsInMessage);
308: // exchange = me;
309: }
310:
311: @Override
312: protected void doProcessFault(NormalizedMessage nm, String fault)
313: throws JMSException, InterruptedException, ConfigurationException,Exception {
314: // /if there is any kind of fault
315: // message then just put the JMSMessage back in the queue
316: recoverableInRetryFault=false;
317: FaultHandler fh = new FaultHandler(fault);
318:
319: logger.info("Consumer endpoint '" + endpoint.getEndpoint()
320: + "' reports error: " + fh.getMessage());
321: if (fh.getEndpointString() != null)
322: logger.info("The error was occured at the endpoint: "
323: + fh.getEndpointString());
324:
325: logger.debug("Fault handler: isRecoverable="+fh.isRecoverable());
326:
327: if (fh.isRecoverable()) {
328: faultType = FaultMessageTypes.RECOVERABLE;
329:
330:
331: jmsHandler.rollback();
332:// send notification
333: endpoint.sendJMSTransactionRollBackNotification();
334:
335: logger
336: .info("JMS Rollback in doProcessFault, fault is Recoverable");
337:
338: if (endpoint.isRetry()) { // if in retry mode
339: logger.debug("JMS Is in retry Mode: retryInterval="
340: + endpoint.getRetryInterval() + "; Retry's count=" + retrysCount.get());
341:
342: //Thread.sleep(retryInterval); // wait a while,
343: synchronized(retrysCount)
344: {
345: retrysCount.incrementAndGet(); // decreasing retrys left
346: retrysCount.notify();
347: }
348: // stop message pooling
349: if (retrysCount.get() >=endpoint.getMaxRetryCount()){
350: logger.info("retry count exceeded");
351: doStop(true);
352: //send notification
353: logger.debug("send notification :retry count exceeded");
354: endpoint.sendJMSRetryCountExceededNotification();
355: logger.debug("set state:retry stooped");
356: }else
357: {
358: recoverableInRetryFault=true;
359: endpoint.setState(StatusConstants.STATUS_RETRY_STARTED);
360:
361: }
362: } else // if in noRetry mode and recoverable fault
363: // then just stop the component and roll back
364: {
365: doStop(false);
366:
367: }
368:
369: } else {
370: faultType = FaultMessageTypes.UNRECOVERABLE;
371: logger.debug("before write to DLQ");
372: jmsHandler.writeToDLQ(jmsInMessage);
373: logger.debug("after write to DLQ");
374:// send notification
375: endpoint.sendJMSDLQNotification();
376: logger.debug("after sent DLQ notification");
377: jmsHandler.commit();
378: logger.info("JMS Commit in doProcessFault, Fault is Urecoverable");
379: }
380: }
381:
382: /*
383: * Process out situation
384: *
385: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseConsumerProcessor#doProcessOut(javax.jbi.messaging.NormalizedMessage,
386: * java.lang.String)
387: */
388: @Override
389: protected void doProcessOut(NormalizedMessage nm, String s,
390: MessageExchange exchange) throws JMSException,
391: ConfigurationException, TransformerConfigurationException,
392: IOException, TransformerException {
393: String charset = endpoint.getCharset();
394:
395: //TODO: does reply in consumer need to check savaMetadata
396: Message jmsReply = JMSMarshaler.normalizedToJMS(exchange
397: .getMessage("out"), exchange, endpoint.getWriteStyle(),
398: charset, jmsHandler.getSession(),false);
399: // getting all properties from mExchange and setting them to the
400: // jms message
401: // JMSMarshaler.addMetatataToJms(jmsReply, exchange);
402:
403: // sending the message to reply queue
404: jmsHandler.reply(jmsInMessage, jmsReply);
405: // Process the Out message and write it into the reply queue.
406: }
407:
408: /* (non-Javadoc)
409: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbConsumerProcessor#doProcessFault(javax.jbi.messaging.NormalizedMessage)
410: */
411: @Override
412: protected String doProcessFault(NormalizedMessage nm) throws JBIException,
413: CbesbException {
414: //needs to be overwritten because of the bug 474
415: String fault = null;
416: if (nm != null) {
417: fault = SourceHelper.createString(nm.getContent());
418: }
419: try {
420: doProcessFault(nm, fault);
421: //since version 1.2b
422: if (recoverableInRetryFault) return "retry";
423: //
424: } catch (Exception e) {
425: throw CbesbException.create(e);
426: }
427: return fault;
428: }
429:
430: /**
431: * Print out at DEBUG level, jms message properties
432: * @param msg
433: * @throws JMSException
434: */
435: protected void debugJmsMessageProperties(Message msg) throws JMSException
436: {
437: Enumeration enumeration=msg.getPropertyNames();
438: String result="JMS Message Properies:\n";
439: while (enumeration.hasMoreElements())
440: {
441: String propName =(String)enumeration.nextElement();
442: Object propValue = msg.getObjectProperty(propName);
443: result+=("["+propName+"]"+"="+propValue)+"\n";
444: }
445: result+=("JMS Message Properies:(endprint)--->")+"\n";
446: logger.debug(result);
447: }
448:
449:}
|