001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.messaging;
018:
019: import java.util.ArrayList;
020: import java.util.List;
021: import java.util.Map;
022: import java.util.concurrent.ArrayBlockingQueue;
023: import java.util.concurrent.BlockingQueue;
024: import java.util.concurrent.ConcurrentHashMap;
025: import java.util.concurrent.TimeUnit;
026: import java.util.concurrent.atomic.AtomicBoolean;
027:
028: import javax.jbi.JBIException;
029: import javax.jbi.component.Component;
030: import javax.jbi.component.ComponentLifeCycle;
031: import javax.jbi.messaging.DeliveryChannel;
032: import javax.jbi.messaging.ExchangeStatus;
033: import javax.jbi.messaging.MessageExchange;
034: import javax.jbi.messaging.MessageExchange.Role;
035: import javax.jbi.messaging.MessageExchangeFactory;
036: import javax.jbi.messaging.MessagingException;
037: import javax.jbi.servicedesc.ServiceEndpoint;
038: import javax.transaction.Transaction;
039: import javax.transaction.TransactionManager;
040: import javax.xml.namespace.QName;
041:
042: import org.apache.commons.logging.Log;
043: import org.apache.commons.logging.LogFactory;
044: import org.apache.servicemix.JbiConstants;
045: import org.apache.servicemix.MessageExchangeListener;
046: import org.apache.servicemix.id.IdGenerator;
047: import org.apache.servicemix.jbi.ExchangeTimeoutException;
048: import org.apache.servicemix.jbi.container.ActivationSpec;
049: import org.apache.servicemix.jbi.container.JBIContainer;
050: import org.apache.servicemix.jbi.event.ExchangeEvent;
051: import org.apache.servicemix.jbi.event.ExchangeListener;
052: import org.apache.servicemix.jbi.framework.ComponentContextImpl;
053: import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
054:
055: /**
056: * DeliveryChannel implementation
057: *
058: * @version $Revision: 564607 $
059: */
060: public class DeliveryChannelImpl implements DeliveryChannel {
061:
062: private static final Log LOG = LogFactory
063: .getLog(DeliveryChannelImpl.class);
064:
065: private JBIContainer container;
066:
067: private ComponentContextImpl context;
068:
069: private ComponentMBeanImpl component;
070:
071: private BlockingQueue<MessageExchangeImpl> queue;
072:
073: private IdGenerator idGenerator = new IdGenerator();
074:
075: private MessageExchangeFactory inboundFactory;
076:
077: private int intervalCount;
078:
079: private AtomicBoolean closed = new AtomicBoolean(false);
080:
081: private Map<Thread, Boolean> waiters = new ConcurrentHashMap<Thread, Boolean>();
082:
083: private TransactionManager transactionManager;
084:
085: /**
086: * When using clustering and sendSync, the exchange received will not be the
087: * same as the one sent (because it has been serialized/deserialized. We
088: * thus need to keep the original exchange in a map and override its state.
089: */
090: private Map<String, MessageExchangeImpl> exchangesById = new ConcurrentHashMap<String, MessageExchangeImpl>();
091:
092: /**
093: * Constructor
094: */
095: public DeliveryChannelImpl(ComponentMBeanImpl component) {
096: this .component = component;
097: this .container = component.getContainer();
098: this .queue = new ArrayBlockingQueue<MessageExchangeImpl>(
099: component.getInboundQueueCapacity());
100: this .transactionManager = (TransactionManager) this .container
101: .getTransactionManager();
102: }
103:
104: /**
105: * @return size of the inbound Queue
106: */
107: public int getQueueSize() {
108: return queue.size();
109: }
110:
111: /**
112: * close the delivery channel
113: *
114: * @throws MessagingException
115: */
116: public void close() throws MessagingException {
117: if (this .closed.compareAndSet(false, true)) {
118: if (LOG.isDebugEnabled()) {
119: LOG.debug("Closing DeliveryChannel " + this );
120: }
121: List<MessageExchangeImpl> pending = new ArrayList<MessageExchangeImpl>(
122: queue.size());
123: queue.drainTo(pending);
124: for (MessageExchangeImpl messageExchange : pending) {
125: if (messageExchange.getTransactionContext() != null
126: && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
127: notifyExchange(messageExchange.getMirror(),
128: messageExchange.getMirror(), "close");
129: }
130: }
131: // Interrupt all blocked thread
132: Thread[] threads = waiters.keySet().toArray(
133: new Thread[waiters.size()]);
134: for (int i = 0; i < threads.length; i++) {
135: threads[i].interrupt();
136: }
137: // deactivate all endpoints from this component
138: ServiceEndpoint[] endpoints = container.getRegistry()
139: .getEndpointsForComponent(
140: component.getComponentNameSpace());
141: for (int i = 0; i < endpoints.length; i++) {
142: try {
143: component.getContext().deactivateEndpoint(
144: endpoints[i]);
145: } catch (JBIException e) {
146: LOG.error("Error deactivating endpoint", e);
147: }
148: }
149: // TODO: Cause all accepts to return null
150: // TODO: Abort all pending exchanges
151: }
152: }
153:
154: protected void checkNotClosed() throws MessagingException {
155: if (closed.get()) {
156: throw new MessagingException(this + " has been closed.");
157: }
158: }
159:
160: /**
161: * Create a message exchange factory. This factory will create exchange
162: * instances with all appropriate properties set to null.
163: *
164: * @return a message exchange factory
165: */
166: public MessageExchangeFactory createExchangeFactory() {
167: MessageExchangeFactoryImpl result = createMessageExchangeFactory();
168: result.setContext(context);
169: ActivationSpec activationSpec = context.getActivationSpec();
170: if (activationSpec != null) {
171: String componentName = context.getComponentNameSpace()
172: .getName();
173: // lets auto-default the container-routing information
174: QName serviceName = activationSpec.getDestinationService();
175: if (serviceName != null) {
176: result.setServiceName(serviceName);
177: LOG.debug("default destination serviceName for "
178: + componentName + " = " + serviceName);
179: }
180: QName interfaceName = activationSpec
181: .getDestinationInterface();
182: if (interfaceName != null) {
183: result.setInterfaceName(interfaceName);
184: LOG.debug("default destination interfaceName for "
185: + componentName + " = " + interfaceName);
186: }
187: QName operationName = activationSpec
188: .getDestinationOperation();
189: if (operationName != null) {
190: result.setOperationName(operationName);
191: LOG.debug("default destination operationName for "
192: + componentName + " = " + operationName);
193: }
194: String endpointName = activationSpec
195: .getDestinationEndpoint();
196: if (endpointName != null) {
197: boolean endpointSet = false;
198: LOG.debug("default destination endpointName for "
199: + componentName + " = " + endpointName);
200: if (serviceName != null && endpointName != null) {
201: endpointName = endpointName.trim();
202: ServiceEndpoint endpoint = container.getRegistry()
203: .getEndpoint(serviceName, endpointName);
204: if (endpoint != null) {
205: result.setEndpoint(endpoint);
206: LOG
207: .info("Set default destination endpoint for "
208: + componentName
209: + " to "
210: + endpoint);
211: endpointSet = true;
212: }
213: }
214: if (!endpointSet) {
215: LOG.warn("Could not find destination endpoint for "
216: + componentName + " service(" + serviceName
217: + ") with endpointName " + endpointName);
218: }
219: }
220: }
221: return result;
222: }
223:
224: /**
225: * Create a message exchange factory for the given interface name.
226: *
227: * @param interfaceName
228: * name of the interface for which all exchanges created by the
229: * returned factory will be set
230: * @return an exchange factory that will create exchanges for the given
231: * interface; must be non-null
232: */
233: public MessageExchangeFactory createExchangeFactory(
234: QName interfaceName) {
235: MessageExchangeFactoryImpl result = createMessageExchangeFactory();
236: result.setInterfaceName(interfaceName);
237: return result;
238: }
239:
240: /**
241: * Create a message exchange factory for the given service name.
242: *
243: * @param serviceName
244: * name of the service for which all exchanges created by the
245: * returned factory will be set
246: * @return an exchange factory that will create exchanges for the given
247: * service; must be non-null
248: */
249: public MessageExchangeFactory createExchangeFactoryForService(
250: QName serviceName) {
251: MessageExchangeFactoryImpl result = createMessageExchangeFactory();
252: result.setServiceName(serviceName);
253: return result;
254: }
255:
256: /**
257: * Create a message exchange factory for the given endpoint.
258: *
259: * @param endpoint
260: * endpoint for which all exchanges created by the returned
261: * factory will be set for
262: * @return an exchange factory that will create exchanges for the given
263: * endpoint
264: */
265: public MessageExchangeFactory createExchangeFactory(
266: ServiceEndpoint endpoint) {
267: MessageExchangeFactoryImpl result = createMessageExchangeFactory();
268: result.setEndpoint(endpoint);
269: return result;
270: }
271:
272: protected MessageExchangeFactoryImpl createMessageExchangeFactory() {
273: MessageExchangeFactoryImpl messageExchangeFactory = new MessageExchangeFactoryImpl(
274: idGenerator, closed);
275: messageExchangeFactory.setContext(context);
276: return messageExchangeFactory;
277: }
278:
279: /**
280: * @return a MessageExchange - blocking call
281: * @throws MessagingException
282: */
283: public MessageExchange accept() throws MessagingException {
284: return accept(Long.MAX_VALUE);
285: }
286:
287: /**
288: * return a MessageExchange
289: *
290: * @param timeoutMS
291: * @return Message Exchange
292: * @throws MessagingException
293: */
294: public MessageExchange accept(long timeoutMS)
295: throws MessagingException {
296: try {
297: checkNotClosed();
298: MessageExchangeImpl me = queue.poll(timeoutMS,
299: TimeUnit.MILLISECONDS);
300: if (me != null) {
301: // If the exchange has already timed out,
302: // do not give it to the component
303: if (me.getPacket().isAborted()) {
304: if (LOG.isDebugEnabled()) {
305: LOG.debug("Aborted " + me.getExchangeId()
306: + " in " + this );
307: }
308: me = null;
309: } else {
310: if (LOG.isDebugEnabled()) {
311: LOG.debug("Accepting " + me.getExchangeId()
312: + " in " + this );
313: }
314: // If we have a tx lock and the exchange is not active, we
315: // need
316: // to notify here without resuming transaction
317: if (me.getTxLock() != null
318: && me.getStatus() != ExchangeStatus.ACTIVE) {
319: notifyExchange(me.getMirror(), me.getTxLock(),
320: "acceptFinishedExchangeWithTxLock");
321: me.handleAccept();
322: if (LOG.isTraceEnabled()) {
323: LOG.trace("Accepted: " + me);
324: }
325: // We transactionnaly deliver a finished exchange
326: } else if (me.isTransacted()
327: && me.getStatus() != ExchangeStatus.ACTIVE) {
328: // Do not resume transaction
329: me.handleAccept();
330: if (LOG.isTraceEnabled()) {
331: LOG.trace("Accepted: " + me);
332: }
333: } else {
334: resumeTx(me);
335: me.handleAccept();
336: if (LOG.isTraceEnabled()) {
337: LOG.trace("Accepted: " + me);
338: }
339: }
340: }
341: }
342: if (me != null) {
343: // Call input listeners
344: ExchangeListener[] l = (ExchangeListener[]) container
345: .getListeners(ExchangeListener.class);
346: ExchangeEvent event = new ExchangeEvent(me,
347: ExchangeEvent.EXCHANGE_ACCEPTED);
348: for (int i = 0; i < l.length; i++) {
349: try {
350: l[i].exchangeAccepted(event);
351: } catch (Exception e) {
352: LOG.warn("Error calling listener: "
353: + e.getMessage(), e);
354: }
355: }
356: }
357: return me;
358: } catch (InterruptedException e) {
359: throw new MessagingException("accept failed", e);
360: }
361: }
362:
363: protected void autoSetPersistent(MessageExchangeImpl me) {
364: Boolean persistent = me.getPersistent();
365: if (persistent == null) {
366: if (context.getActivationSpec().getPersistent() != null) {
367: persistent = context.getActivationSpec()
368: .getPersistent();
369: } else {
370: persistent = Boolean.valueOf(context.getContainer()
371: .isPersistent());
372: }
373: me.setPersistent(persistent);
374: }
375: }
376:
377: protected void throttle() {
378: if (component.isExchangeThrottling()) {
379: if (component.getThrottlingInterval() > intervalCount) {
380: intervalCount = 0;
381: try {
382: Thread.sleep(component.getThrottlingTimeout());
383: } catch (InterruptedException e) {
384: LOG.warn("throttling failed", e);
385: }
386: }
387: intervalCount++;
388: }
389: }
390:
391: protected void doSend(MessageExchangeImpl me, boolean sync)
392: throws MessagingException {
393: MessageExchangeImpl mirror = me.getMirror();
394: boolean finished = me.getStatus() != ExchangeStatus.ACTIVE;
395: try {
396: if (LOG.isTraceEnabled()) {
397: LOG.trace("Sent: " + me);
398: }
399: // If the message has timed out
400: if (me.getPacket().isAborted()) {
401: throw new ExchangeTimeoutException(me);
402: }
403: // Auto enlist exchange in transaction
404: autoEnlistInTx(me);
405: // Update persistence info
406: autoSetPersistent(me);
407: // Throttle if needed
408: throttle();
409: // Store the consumer component
410: if (me.getRole() == Role.CONSUMER) {
411: me.setSourceId(component.getComponentNameSpace());
412: }
413: // Call the listeners before the ownership changes
414: // Call input listeners
415: ExchangeListener[] l = (ExchangeListener[]) container
416: .getListeners(ExchangeListener.class);
417: ExchangeEvent event = new ExchangeEvent(me,
418: ExchangeEvent.EXCHANGE_SENT);
419: for (int i = 0; i < l.length; i++) {
420: try {
421: l[i].exchangeSent(event);
422: } catch (Exception e) {
423: LOG.warn("Error calling listener: "
424: + e.getMessage(), e);
425: }
426: }
427: // Change ownership
428: me.handleSend(sync);
429: mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE);
430: // If this is the DONE or ERROR status from a synchronous
431: // transactional exchange,
432: // it should not be part of the transaction, so remove the tx
433: // context
434: if (finished
435: && me.getTxLock() == null
436: && me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED
437: && !me.isPushDelivery()
438: && me.getRole() == Role.CONSUMER) {
439: me.setTransactionContext(null);
440: }
441: container.sendExchange(mirror);
442: } catch (MessagingException e) {
443: if (LOG.isDebugEnabled()) {
444: LOG.debug("Exception processing: " + me.getExchangeId()
445: + " in " + this );
446: }
447: throw e;
448: } finally {
449: // If there is a tx lock, we need to suspend and notify
450: if (me.getTxLock() != null) {
451: if (mirror.getTxState() == MessageExchangeImpl.TX_STATE_ENLISTED) {
452: suspendTx(mirror);
453: }
454: synchronized (me.getTxLock()) {
455: notifyExchange(me, me.getTxLock(),
456: "doSendWithTxLock");
457: }
458: }
459: }
460: }
461:
462: /**
463: * routes a MessageExchange
464: *
465: * @param messageExchange
466: * @throws MessagingException
467: */
468: public void send(MessageExchange messageExchange)
469: throws MessagingException {
470: // If the delivery channel has been closed
471: checkNotClosed();
472: // Log call
473: if (LOG.isDebugEnabled()) {
474: LOG.debug("Send " + messageExchange.getExchangeId()
475: + " in " + this );
476: }
477: // // JBI 5.5.2.1.3: remove sync property
478: messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
479: // Call doSend
480: MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
481: doSend(me, false);
482: }
483:
484: /**
485: * routes a MessageExchange
486: *
487: * @param messageExchange
488: * @return true if processed
489: * @throws MessagingException
490: */
491: public boolean sendSync(MessageExchange messageExchange)
492: throws MessagingException {
493: return sendSync(messageExchange, 0);
494: }
495:
496: /**
497: * routes a MessageExchange
498: *
499: * @param messageExchange
500: * @param timeout
501: * @return true if processed
502: * @throws MessagingException
503: */
504: public boolean sendSync(MessageExchange messageExchange,
505: long timeout) throws MessagingException {
506: // If the delivery channel has been closed
507: checkNotClosed();
508: // Log call
509: if (LOG.isDebugEnabled()) {
510: LOG.debug("SendSync " + messageExchange.getExchangeId()
511: + " in " + this );
512: }
513: boolean result = false;
514: // JBI 5.5.2.1.3: set the sendSync property
515: messageExchange.setProperty(JbiConstants.SEND_SYNC,
516: Boolean.TRUE);
517: // Call doSend
518: MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
519: String exchangeKey = me.getKey();
520: try {
521: exchangesById.put(exchangeKey, me);
522: // Synchronously send a message and wait for the response
523: synchronized (me) {
524: doSend(me, true);
525: if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
526: waitForExchange(me, me, timeout, "sendSync");
527: } else {
528: if (LOG.isDebugEnabled()) {
529: LOG
530: .debug("Exchange "
531: + messageExchange
532: .getExchangeId()
533: + " has already been answered (no need to wait)");
534: }
535: }
536: }
537: if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
538: me.handleAccept();
539: // If the sender flag has been removed, it means
540: // the message has been delivered in the same thread
541: // so there is no need to resume the transaction
542: // See processInBound
543: // if (messageExchangeImpl.getSyncSenderThread() != null) {
544: resumeTx(me);
545: // }
546: result = true;
547: } else {
548: // JBI 5.5.2.1.3: the exchange should be set to ERROR status
549: if (LOG.isDebugEnabled()) {
550: LOG.debug("Exchange "
551: + messageExchange.getExchangeId()
552: + " has been aborted");
553: }
554: me.getPacket().setAborted(true);
555: result = false;
556: }
557: } catch (InterruptedException e) {
558: throw new MessagingException(e);
559: } catch (RuntimeException e) {
560: // e.printStackTrace();
561: throw e;
562: } finally {
563: exchangesById.remove(exchangeKey);
564: }
565: return result;
566: }
567:
568: /**
569: * @return Returns the container.
570: */
571: public JBIContainer getContainer() {
572: return container;
573: }
574:
575: /**
576: * @param container
577: * The container to set.
578: */
579: public void setContainer(JBIContainer container) {
580: this .container = container;
581: }
582:
583: /**
584: * @return Returns the componentConnector.
585: */
586: public ComponentMBeanImpl getComponent() {
587: return component;
588: }
589:
590: /**
591: * Get the context
592: *
593: * @return the context
594: */
595: public ComponentContextImpl getContext() {
596: return context;
597: }
598:
599: /**
600: * set the context
601: *
602: * @param context
603: */
604: public void setContext(ComponentContextImpl context) {
605: this .context = context;
606: }
607:
608: /**
609: * Used internally for passing in a MessageExchange
610: *
611: * @param me
612: * @throws MessagingException
613: */
614: public void processInBound(MessageExchangeImpl me)
615: throws MessagingException {
616: if (LOG.isTraceEnabled()) {
617: LOG.trace("Processing inbound exchange: " + me);
618: }
619: // Check if the delivery channel has been closed
620: checkNotClosed();
621: // Retrieve the original exchange sent
622: MessageExchangeImpl original = exchangesById.get(me.getKey());
623: if (original != null && me != original) {
624: original.copyFrom(me);
625: me = original;
626: }
627: // Check if the incoming exchange is a response to a synchronous
628: // exchange previously sent
629: // In this case, we do not have to queue it, but rather notify the
630: // waiting thread.
631: if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
632: // If the mirror has been delivered using push, better wait until
633: // the push call return. This can only work if not using clustered
634: // flows,
635: // but the flag is transient so we do not care.
636: // Ensure that data is uptodate with the incoming exchange (in
637: // case the exchange has
638: // been serialized / deserialized by a clustered flow)
639: suspendTx(original);
640: me
641: .setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
642: notifyExchange(original, original,
643: "processInboundSynchronousExchange");
644: return;
645: }
646:
647: // If the component implements the MessageExchangeListener,
648: // the delivery can be made synchronously, so we don't need
649: // to bother with transactions
650: MessageExchangeListener listener = getExchangeListener();
651: if (listener != null) {
652: me.handleAccept();
653: if (LOG.isTraceEnabled()) {
654: LOG.trace("Received: " + me);
655: }
656: // Call input listeners
657: ExchangeListener[] l = (ExchangeListener[]) container
658: .getListeners(ExchangeListener.class);
659: ExchangeEvent event = new ExchangeEvent(me,
660: ExchangeEvent.EXCHANGE_ACCEPTED);
661: for (int i = 0; i < l.length; i++) {
662: try {
663: l[i].exchangeAccepted(event);
664: } catch (Exception e) {
665: LOG.warn("Error calling listener: "
666: + e.getMessage(), e);
667: }
668: }
669: // Set the flag the the exchange was delivered using push mode
670: // This is important for transaction boundaries
671: me.setPushDeliver(true);
672: // Deliver the exchange
673: ClassLoader old = Thread.currentThread()
674: .getContextClassLoader();
675: try {
676: Thread.currentThread().setContextClassLoader(
677: component.getComponent().getClass()
678: .getClassLoader());
679: listener.onMessageExchange(me);
680: } finally {
681: Thread.currentThread().setContextClassLoader(old);
682: }
683: // TODO: handle delayed exchange notifications
684: return;
685: }
686:
687: // Component uses pull delivery.
688:
689: // If the exchange is transacted, special care should be taken.
690: // But if the exchange is no more ACTIVE, just queue it, as
691: // we will never have an answer back.
692: if (me.isTransacted()
693: && me.getStatus() == ExchangeStatus.ACTIVE) {
694: // If the transaction is conveyed by the exchange
695: // We do not need to resume the transaction in this thread
696: if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
697: try {
698: suspendTx(me);
699: queue.put(me);
700: } catch (InterruptedException e) {
701: LOG.debug("Exchange " + me.getExchangeId()
702: + " aborted due to thread interruption", e);
703: me.getPacket().setAborted(true);
704: }
705: // Else the delivery / send are enlisted in the current tx.
706: // We must suspend the transaction, queue it, and wait for the
707: // answer
708: // to be sent, at which time the tx should be suspended and resumed
709: // in
710: // this thread.
711: } else {
712: Object lock = new Object();
713: synchronized (lock) {
714: try {
715: me.setTxLock(lock);
716: suspendTx(me);
717: queue.put(me);
718: waitForExchange(me, lock, 0,
719: "processInboundTransactionalExchange");
720: } catch (InterruptedException e) {
721: LOG
722: .debug(
723: "Exchange "
724: + me.getExchangeId()
725: + " aborted due to thread interruption",
726: e);
727: me.getPacket().setAborted(true);
728: } finally {
729: me.setTxLock(null);
730: resumeTx(me);
731: }
732: }
733: }
734: // If the exchange is ACTIVE, the transaction boundary will suspended
735: // when the
736: // answer is sent
737: // Else just queue the exchange
738: } else {
739: try {
740: queue.put(me);
741: } catch (InterruptedException e) {
742: LOG.debug("Exchange " + me.getExchangeId()
743: + " aborted due to thread interruption", e);
744: me.getPacket().setAborted(true);
745: }
746: }
747: }
748:
749: protected MessageExchangeListener getExchangeListener() {
750: Component comp = this .component.getComponent();
751: if (comp instanceof MessageExchangeListener) {
752: return (MessageExchangeListener) comp;
753: }
754: ComponentLifeCycle lifecycle = this .component.getLifeCycle();
755: if (lifecycle instanceof MessageExchangeListener) {
756: return (MessageExchangeListener) lifecycle;
757: }
758: return null;
759: }
760:
761: /**
762: * Synchronization must be performed on the given exchange when calling this
763: * method
764: *
765: * @param me
766: * @throws InterruptedException
767: */
768: protected void waitForExchange(MessageExchangeImpl me, Object lock,
769: long timeout, String from) throws InterruptedException {
770: // If the channel is closed while here, we must abort
771: if (LOG.isDebugEnabled()) {
772: LOG.debug("Waiting for exchange " + me.getExchangeId()
773: + " (" + Integer.toHexString(me.hashCode())
774: + ") to be answered in " + this + " from " + from);
775: }
776: Thread th = Thread.currentThread();
777: try {
778: waiters.put(th, Boolean.TRUE);
779: lock.wait(timeout);
780: } finally {
781: waiters.remove(th);
782: }
783: if (LOG.isDebugEnabled()) {
784: LOG.debug("Notified: " + me.getExchangeId() + "("
785: + Integer.toHexString(me.hashCode()) + ") in "
786: + this + " from " + from);
787: }
788: }
789:
790: protected void notifyExchange(MessageExchangeImpl me, Object lock,
791: String from) {
792: if (LOG.isDebugEnabled()) {
793: LOG.debug("Notifying exchange " + me.getExchangeId() + "("
794: + Integer.toHexString(me.hashCode()) + ") in "
795: + this + " from " + from);
796: }
797: synchronized (lock) {
798: lock.notify();
799: }
800: }
801:
802: /**
803: * Get Inbound Factory
804: *
805: * @return the inbound message factory
806: */
807: public MessageExchangeFactory getInboundFactory() {
808: if (inboundFactory == null) {
809: inboundFactory = createExchangeFactory();
810: }
811: return inboundFactory;
812: }
813:
814: protected void suspendTx(MessageExchangeImpl me) {
815: if (transactionManager != null) {
816: try {
817: Transaction oldTx = me.getTransactionContext();
818: if (oldTx != null) {
819: if (LOG.isDebugEnabled()) {
820: LOG.debug("Suspending transaction for "
821: + me.getExchangeId() + " in " + this );
822: }
823: Transaction tx = transactionManager.suspend();
824: if (tx != oldTx) {
825: throw new IllegalStateException(
826: "the transaction context set in the messageExchange is not bound to the current thread");
827: }
828: }
829: } catch (Exception e) {
830: LOG.info("Exchange " + me.getExchangeId()
831: + " aborted due to transaction exception", e);
832: me.getPacket().setAborted(true);
833: }
834: }
835: }
836:
837: protected void resumeTx(MessageExchangeImpl me)
838: throws MessagingException {
839: if (transactionManager != null) {
840: try {
841: Transaction oldTx = me.getTransactionContext();
842: if (oldTx != null) {
843: if (LOG.isDebugEnabled()) {
844: LOG.debug("Resuming transaction for "
845: + me.getExchangeId() + " in " + this );
846: }
847: transactionManager.resume(oldTx);
848: }
849: } catch (Exception e) {
850: throw new MessagingException(e);
851: }
852: }
853: }
854:
855: /**
856: * If the jbi container configured to do so, the message exchange will
857: * automatically be enlisted in the current transaction, if exists.
858: *
859: * @throws MessagingException
860: */
861: protected void autoEnlistInTx(MessageExchangeImpl me)
862: throws MessagingException {
863: if (transactionManager != null
864: && container.isAutoEnlistInTransaction()) {
865: try {
866: Transaction tx = transactionManager.getTransaction();
867: if (tx != null) {
868: Object oldTx = me.getTransactionContext();
869: if (oldTx == null) {
870: me.setTransactionContext(tx);
871: } else if (oldTx != tx) {
872: throw new IllegalStateException(
873: "the transaction context set in the messageExchange is not bound to the current thread");
874: }
875: }
876: } catch (Exception e) {
877: throw new MessagingException(e);
878: }
879: }
880: }
881:
882: /**
883: * @return pretty print
884: */
885: public String toString() {
886: return "DeliveryChannel{" + component.getName() + "}";
887: }
888:
889: }
|