0001: /*
0002: * $Id: AbstractConnector.java 11343 2008-03-13 10:58:26Z tcarlson $
0003: * --------------------------------------------------------------------------------------
0004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
0005: *
0006: * The software in this package is published under the terms of the CPAL v1.0
0007: * license, a copy of which has been included with this distribution in the
0008: * LICENSE.txt file.
0009: */
0010:
0011: package org.mule.transport;
0012:
0013: import org.mule.DefaultExceptionStrategy;
0014: import org.mule.MuleSessionHandler;
0015: import org.mule.RegistryContext;
0016: import org.mule.api.MessagingException;
0017: import org.mule.api.MuleContext;
0018: import org.mule.api.MuleEvent;
0019: import org.mule.api.MuleException;
0020: import org.mule.api.MuleMessage;
0021: import org.mule.api.MuleRuntimeException;
0022: import org.mule.api.config.ThreadingProfile;
0023: import org.mule.api.context.WorkManager;
0024: import org.mule.api.context.notification.ServerNotification;
0025: import org.mule.api.context.notification.ServerNotificationHandler;
0026: import org.mule.api.endpoint.EndpointURI;
0027: import org.mule.api.endpoint.InboundEndpoint;
0028: import org.mule.api.endpoint.OutboundEndpoint;
0029: import org.mule.api.lifecycle.DisposeException;
0030: import org.mule.api.lifecycle.InitialisationException;
0031: import org.mule.api.lifecycle.LifecycleTransitionResult;
0032: import org.mule.api.registry.ServiceDescriptorFactory;
0033: import org.mule.api.registry.ServiceException;
0034: import org.mule.api.service.Service;
0035: import org.mule.api.transport.Connectable;
0036: import org.mule.api.transport.ConnectionStrategy;
0037: import org.mule.api.transport.Connector;
0038: import org.mule.api.transport.ConnectorException;
0039: import org.mule.api.transport.DispatchException;
0040: import org.mule.api.transport.MessageAdapter;
0041: import org.mule.api.transport.MessageDispatcher;
0042: import org.mule.api.transport.MessageDispatcherFactory;
0043: import org.mule.api.transport.MessageReceiver;
0044: import org.mule.api.transport.MessageRequester;
0045: import org.mule.api.transport.MessageRequesterFactory;
0046: import org.mule.api.transport.ReplyToHandler;
0047: import org.mule.api.transport.SessionHandler;
0048: import org.mule.config.i18n.CoreMessages;
0049: import org.mule.context.notification.ConnectionNotification;
0050: import org.mule.context.notification.MessageNotification;
0051: import org.mule.context.notification.OptimisedNotificationHandler;
0052: import org.mule.lifecycle.AlreadyInitialisedException;
0053: import org.mule.model.streaming.DelegatingInputStream;
0054: import org.mule.routing.filters.WildcardFilter;
0055: import org.mule.transformer.TransformerUtils;
0056: import org.mule.transport.service.TransportFactory;
0057: import org.mule.transport.service.TransportServiceDescriptor;
0058: import org.mule.transport.service.TransportServiceException;
0059: import org.mule.util.BeanUtils;
0060: import org.mule.util.ClassUtils;
0061: import org.mule.util.CollectionUtils;
0062: import org.mule.util.ObjectNameHelper;
0063: import org.mule.util.ObjectUtils;
0064: import org.mule.util.StringUtils;
0065: import org.mule.util.concurrent.NamedThreadFactory;
0066: import org.mule.util.concurrent.WaitableBoolean;
0067:
0068: import java.beans.ExceptionListener;
0069: import java.io.IOException;
0070: import java.io.InputStream;
0071: import java.io.OutputStream;
0072: import java.util.ArrayList;
0073: import java.util.Collections;
0074: import java.util.Iterator;
0075: import java.util.List;
0076: import java.util.Map;
0077: import java.util.Properties;
0078:
0079: import javax.resource.spi.work.WorkEvent;
0080: import javax.resource.spi.work.WorkListener;
0081:
0082: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
0083: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
0084: import edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService;
0085: import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
0086: import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
0087: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
0088: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
0089: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
0090:
0091: import org.apache.commons.logging.Log;
0092: import org.apache.commons.logging.LogFactory;
0093: import org.apache.commons.pool.KeyedPoolableObjectFactory;
0094: import org.apache.commons.pool.impl.GenericKeyedObjectPool;
0095:
0096: /**
0097: * <code>AbstractConnector</code> provides base functionality for all connectors
0098: * provided with Mule. Connectors are the mechanism used to connect to external
0099: * systems and protocols in order to send and receive data. <p/> The
0100: * <code>AbstractConnector</code> provides getter and setter methods for endpoint
0101: * name, transport name and protocol. It also provides methods to stop and start
0102: * connecotors and sets up a dispatcher threadpool which allows deriving connectors
0103: * the possibility to dispatch work to separate threads. This functionality is
0104: * controlled with the <i> doThreading</i> property on the threadingProfiles for
0105: * dispachers and receivers. The lifecycle for a connector is -
0106: * <ol>
0107: * <li>Create
0108: * <li>Initialise
0109: * <li>Connect
0110: * <li>Connect receivers
0111: * <li>Start
0112: * <li>Start Receivers
0113: * <li>Stop
0114: * <li>Stop Receivers
0115: * <li>Disconnect
0116: * <li>Disconnect Receivers
0117: * <li>Dispose
0118: * <li>Dispose Receivers
0119: * </ol>
0120: */
0121: public abstract class AbstractConnector implements Connector,
0122: ExceptionListener, Connectable, WorkListener {
0123: /**
0124: * Default number of concurrent transactional receivers.
0125: */
0126: public static final int DEFAULT_NUM_CONCURRENT_TX_RECEIVERS = 4;
0127:
0128: /**
0129: * logger used by this class
0130: */
0131: protected final Log logger = LogFactory.getLog(getClass());
0132:
0133: /**
0134: * Specifies if the endpoint started
0135: */
0136: protected final AtomicBoolean started = new AtomicBoolean(false);
0137:
0138: /**
0139: * True once the endpoint has been initialsed
0140: */
0141: protected final AtomicBoolean initialised = new AtomicBoolean(false);
0142:
0143: /**
0144: * The name that identifies the endpoint
0145: */
0146: protected volatile String name;
0147:
0148: /**
0149: * The exception strategy used by this connector
0150: */
0151: protected volatile ExceptionListener exceptionListener;
0152:
0153: /**
0154: * Determines in the connector is alive and well
0155: */
0156: protected final AtomicBoolean disposed = new AtomicBoolean(false);
0157:
0158: /**
0159: * Determines in connector has been told to dispose
0160: */
0161: protected final AtomicBoolean disposing = new AtomicBoolean(false);
0162:
0163: /**
0164: * Factory used to create dispatchers for this connector
0165: */
0166: protected volatile MessageDispatcherFactory dispatcherFactory;
0167:
0168: /**
0169: * Factory used to create requesters for this connector
0170: */
0171: protected volatile MessageRequesterFactory requesterFactory;
0172:
0173: /**
0174: * A pool of dispatchers for this connector, keyed by endpoint
0175: */
0176: protected final GenericKeyedObjectPool dispatchers = new GenericKeyedObjectPool();
0177:
0178: /**
0179: * A pool of requesters for this connector, keyed by endpoint
0180: */
0181: protected final GenericKeyedObjectPool requesters = new GenericKeyedObjectPool();
0182:
0183: /**
0184: * The collection of listeners on this connector. Keyed by entrypoint
0185: */
0186: protected final ConcurrentMap receivers = new ConcurrentHashMap();
0187:
0188: /**
0189: * Defines the dispatcher threading profile
0190: */
0191: private volatile ThreadingProfile dispatcherThreadingProfile;
0192:
0193: /**
0194: * Defines the requester threading profile
0195: */
0196: private volatile ThreadingProfile requesterThreadingProfile;
0197:
0198: /**
0199: * Defines the receiver threading profile
0200: */
0201: private volatile ThreadingProfile receiverThreadingProfile;
0202:
0203: /**
0204: * @see {@link #isCreateMultipleTransactedReceivers()}
0205: */
0206: protected volatile boolean createMultipleTransactedReceivers = true;
0207:
0208: /**
0209: * @see {@link #getNumberOfConcurrentTransactedReceivers()}
0210: */
0211: protected volatile int numberOfConcurrentTransactedReceivers = DEFAULT_NUM_CONCURRENT_TX_RECEIVERS;
0212:
0213: protected volatile ConnectionStrategy connectionStrategy;
0214:
0215: protected final WaitableBoolean connected = new WaitableBoolean(
0216: false);
0217:
0218: protected final WaitableBoolean connecting = new WaitableBoolean(
0219: false);
0220:
0221: /**
0222: * If the connect method was called via the start method, this will be set so
0223: * that when the connector comes on line it will be started
0224: */
0225: protected final WaitableBoolean startOnConnect = new WaitableBoolean(
0226: false);
0227:
0228: /**
0229: * Optimise the handling of message notifications. If dynamic is set to false then the
0230: * cached notification handler implements a shortcut for message notifications.
0231: */
0232: private boolean dynamicNotification = false;
0233: private ServerNotificationHandler cachedNotificationHandler;
0234:
0235: private final List supportedProtocols;
0236:
0237: /**
0238: * A shared work manager for all receivers registered with this connector.
0239: */
0240: private final AtomicReference/*<WorkManager>*/receiverWorkManager = new AtomicReference();
0241:
0242: /**
0243: * A shared work manager for all requesters created for this connector.
0244: */
0245: private final AtomicReference/*<WorkManager>*/dispatcherWorkManager = new AtomicReference();
0246:
0247: /**
0248: * A shared work manager for all requesters created for this connector.
0249: */
0250: private final AtomicReference/*<WorkManager>*/requesterWorkManager = new AtomicReference();
0251:
0252: /**
0253: * A generic scheduling service for tasks that need to be performed periodically.
0254: */
0255: private final AtomicReference/*<ScheduledExecutorService>*/scheduler = new AtomicReference();
0256:
0257: /**
0258: * Holds the service configuration for this connector
0259: */
0260: protected volatile TransportServiceDescriptor serviceDescriptor;
0261:
0262: /**
0263: * The map of service overrides that can be used to extend the capabilities of the
0264: * connector
0265: */
0266: protected volatile Properties serviceOverrides;
0267:
0268: /**
0269: * The strategy used for reading and writing session information to and fromt he
0270: * transport
0271: */
0272: protected volatile SessionHandler sessionHandler = new MuleSessionHandler();
0273:
0274: protected MuleContext muleContext;
0275:
0276: public AbstractConnector() {
0277: setDynamicNotification(false);
0278:
0279: // always add at least the default protocol
0280: supportedProtocols = new ArrayList();
0281: supportedProtocols.add(getProtocol().toLowerCase());
0282:
0283: connectionStrategy = new SingleAttemptConnectionStrategy();
0284:
0285: // TODO dispatcher pool configuration should be extracted, maybe even
0286: // moved into the factory?
0287: // NOTE: testOnBorrow MUST be FALSE. this is a bit of a design bug in
0288: // commons-pool since validate is used for both activation and passivation,
0289: // but has no way of knowing which way it is going.
0290: dispatchers.setTestOnBorrow(false);
0291: dispatchers.setTestOnReturn(true);
0292: requesters.setTestOnBorrow(false);
0293: requesters.setTestOnReturn(true);
0294: }
0295:
0296: /*
0297: * (non-Javadoc)
0298: *
0299: * @see org.mule.transport.UMOConnector#getName()
0300: */
0301: public String getName() {
0302: return name;
0303: }
0304:
0305: /*
0306: * (non-Javadoc)
0307: *
0308: * @see org.mule.transport.UMOConnector#setName(java.lang.String)
0309: */
0310: public void setName(String newName) {
0311: if (newName == null) {
0312: throw new IllegalArgumentException(CoreMessages
0313: .objectIsNull("Connector name").toString());
0314: }
0315:
0316: if (logger.isDebugEnabled()) {
0317: logger.debug("Set Connector name to: " + newName);
0318: }
0319:
0320: name = newName;
0321: }
0322:
0323: /*
0324: * (non-Javadoc)
0325: *
0326: * @see org.mule.transport.UMOConnector#create(java.util.HashMap)
0327: */
0328: public final synchronized LifecycleTransitionResult initialise()
0329: throws InitialisationException {
0330: if (initialised.get()) {
0331: InitialisationException e = new AlreadyInitialisedException(
0332: "Connector '" + getProtocol() + "." + getName()
0333: + "'", this );
0334: throw e;
0335: // Just log a warning since initializing twice is bad but might not be the end of the world.
0336: //logger.warn(e);
0337: }
0338:
0339: if (logger.isInfoEnabled()) {
0340: logger.info("Initialising: " + this );
0341: }
0342:
0343: // Use lazy-init (in get() methods) for this instead.
0344: //dispatcherThreadingProfile = muleContext.getDefaultMessageDispatcherThreadingProfile();
0345: //requesterThreadingProfile = muleContext.getDefaultMessageRequesterThreadingProfile();
0346: //receiverThreadingProfile = muleContext.getDefaultMessageReceiverThreadingProfile();
0347:
0348: // Initialise the structure of this connector
0349: this .initFromServiceDescriptor();
0350:
0351: this .doInitialise();
0352:
0353: // We do the management context injection here just in case we're using a default ExceptionStrategy
0354: //We always create a default just in case anything goes wrong before
0355: if (exceptionListener == null) {
0356: exceptionListener = new DefaultExceptionStrategy();
0357: ((DefaultExceptionStrategy) exceptionListener)
0358: .setMuleContext(muleContext);
0359: ((DefaultExceptionStrategy) exceptionListener).initialise();
0360: }
0361:
0362: try {
0363: initWorkManagers();
0364: } catch (MuleException e) {
0365: throw new InitialisationException(e, this );
0366: }
0367:
0368: initialised.set(true);
0369:
0370: return LifecycleTransitionResult.OK;
0371: }
0372:
0373: /*
0374: * (non-Javadoc)
0375: *
0376: * @see org.mule.api.transport.Connector#start()
0377: */
0378: public final synchronized LifecycleTransitionResult start()
0379: throws MuleException {
0380: this .checkDisposed();
0381:
0382: if (!this .isStarted()) {
0383: //TODO: Not sure about this. Right now the connector will connect only once
0384: // there is an endpoint associated with it and that endpoint is connected.
0385: // Do we also need the option of connecting the connector without any endpoints?
0386: // if (!this.isConnected())
0387: // {
0388: // startOnConnect.set(true);
0389: // this.getConnectionStrategy().connect(this);
0390: // // Only start once we are connected
0391: // return;
0392: // }
0393: if (!this .isConnected()) {
0394: startOnConnect.set(true);
0395: // Don't call getConnectionStrategy(), it clones the connection strategy.
0396: // Connectors should have a single reconnection thread, unlike per receiver/dispatcher
0397: connectionStrategy.connect(this );
0398: // Only start once we are connected
0399: return LifecycleTransitionResult.OK;
0400: }
0401:
0402: if (logger.isInfoEnabled()) {
0403: logger.info("Starting: " + this );
0404: }
0405:
0406: // the scheduler is recreated after stop()
0407: ScheduledExecutorService currentScheduler = (ScheduledExecutorService) scheduler
0408: .get();
0409: if (currentScheduler == null
0410: || currentScheduler.isShutdown()) {
0411: scheduler.set(this .getScheduler());
0412: }
0413:
0414: this .doStart();
0415: started.set(true);
0416:
0417: if (receivers != null) {
0418: for (Iterator iterator = receivers.values().iterator(); iterator
0419: .hasNext();) {
0420: MessageReceiver mr = (MessageReceiver) iterator
0421: .next();
0422: if (logger.isDebugEnabled()) {
0423: logger.debug("Starting receiver on endpoint: "
0424: + mr.getEndpoint().getEndpointURI());
0425: }
0426: mr.start();
0427: }
0428: }
0429:
0430: if (logger.isInfoEnabled()) {
0431: logger.info("Started: " + this );
0432: }
0433: }
0434: return LifecycleTransitionResult.OK;
0435: }
0436:
0437: /*
0438: * (non-Javadoc)
0439: *
0440: * @see org.mule.api.transport.Connector#isStarted()
0441: */
0442: public boolean isStarted() {
0443: return started.get();
0444: }
0445:
0446: /*
0447: * (non-Javadoc)
0448: *
0449: * @see org.mule.api.transport.Connector#stop()
0450: */
0451: public final synchronized LifecycleTransitionResult stop()
0452: throws MuleException {
0453: if (this .isDisposed()) {
0454: return LifecycleTransitionResult.OK;
0455: }
0456:
0457: if (this .isStarted()) {
0458: if (logger.isInfoEnabled()) {
0459: logger.info("Stopping: " + this );
0460: }
0461:
0462: // shutdown our scheduler service
0463: ((ScheduledExecutorService) scheduler.get()).shutdown();
0464:
0465: this .doStop();
0466: started.set(false);
0467:
0468: // Stop all the receivers on this connector (this will cause them to
0469: // disconnect too)
0470: if (receivers != null) {
0471: for (Iterator iterator = receivers.values().iterator(); iterator
0472: .hasNext();) {
0473: MessageReceiver mr = (MessageReceiver) iterator
0474: .next();
0475: if (logger.isDebugEnabled()) {
0476: logger.debug("Stopping receiver on endpoint: "
0477: + mr.getEndpoint().getEndpointURI());
0478: }
0479: mr.stop();
0480: }
0481: }
0482: }
0483:
0484: if (this .isConnected()) {
0485: try {
0486: this .disconnect();
0487: } catch (Exception e) {
0488: // TODO MULE-863: What should we really do?
0489: logger.error("Failed to disconnect: " + e.getMessage(),
0490: e);
0491: }
0492: }
0493:
0494: // make sure the scheduler is gone
0495: scheduler.set(null);
0496:
0497: // we do not need to stop the work managers because they do no harm (will just be idle)
0498: // and will be reused on restart without problems.
0499:
0500: //TODO RM* THis shouldn't be here this.initialised.set(false);
0501: // started=false already issued above right after doStop()
0502: if (logger.isInfoEnabled()) {
0503: logger.info("Stopped: " + this );
0504: }
0505: return LifecycleTransitionResult.OK;
0506: }
0507:
0508: /*
0509: * (non-Javadoc)
0510: *
0511: * @see org.mule.api.transport.Connector#shutdown()
0512: */
0513: public final synchronized void dispose() {
0514: disposing.set(true);
0515:
0516: if (logger.isInfoEnabled()) {
0517: logger.info("Disposing: " + this );
0518: }
0519:
0520: try {
0521: this .stop();
0522: } catch (MuleException e) {
0523: // TODO MULE-863: What should we really do?
0524: logger.warn("Failed to stop during shutdown: "
0525: + e.getMessage(), e);
0526: }
0527:
0528: this .disposeReceivers();
0529: this .disposeDispatchers();
0530: this .disposeRequesters();
0531: this .disposeWorkManagers();
0532:
0533: this .doDispose();
0534: disposed.set(true);
0535: initialised.set(false);
0536:
0537: if (logger.isInfoEnabled()) {
0538: logger.info("Disposed: " + this );
0539: }
0540: }
0541:
0542: protected void initWorkManagers() throws MuleException {
0543: if (receiverWorkManager.get() == null) {
0544: WorkManager newWorkManager = this
0545: .getReceiverThreadingProfile().createWorkManager(
0546: getName() + ".receiver");
0547:
0548: if (receiverWorkManager.compareAndSet(null, newWorkManager)) {
0549: newWorkManager.start();
0550: }
0551: }
0552:
0553: if (dispatcherWorkManager.get() == null) {
0554: WorkManager newWorkManager = this
0555: .getDispatcherThreadingProfile().createWorkManager(
0556: getName() + ".dispatcher");
0557:
0558: if (dispatcherWorkManager.compareAndSet(null,
0559: newWorkManager)) {
0560: newWorkManager.start();
0561: }
0562: }
0563: }
0564:
0565: protected void disposeWorkManagers() {
0566: logger.debug("Disposing dispatcher work manager");
0567: WorkManager workManager = (WorkManager) dispatcherWorkManager
0568: .get();
0569: if (workManager != null) {
0570: workManager.dispose();
0571: }
0572: dispatcherWorkManager.set(null);
0573:
0574: logger.debug("Disposing receiver work manager");
0575: workManager = (WorkManager) receiverWorkManager.get();
0576: if (workManager != null) {
0577: workManager.dispose();
0578: }
0579: receiverWorkManager.set(null);
0580: }
0581:
0582: protected void disposeReceivers() {
0583: if (receivers != null) {
0584: logger.debug("Disposing Receivers");
0585:
0586: for (Iterator iterator = receivers.values().iterator(); iterator
0587: .hasNext();) {
0588: MessageReceiver receiver = (MessageReceiver) iterator
0589: .next();
0590:
0591: try {
0592: this .destroyReceiver(receiver, receiver
0593: .getEndpoint());
0594: } catch (Throwable e) {
0595: // TODO MULE-863: What should we really do?
0596: logger.error("Failed to destroy receiver: "
0597: + receiver, e);
0598: }
0599: }
0600:
0601: receivers.clear();
0602: logger.debug("Receivers Disposed");
0603: }
0604: }
0605:
0606: protected void disposeDispatchers() {
0607: if (dispatchers != null) {
0608: logger.debug("Disposing Dispatchers");
0609: dispatchers.clear();
0610: logger.debug("Dispatchers Disposed");
0611: }
0612: }
0613:
0614: protected void disposeRequesters() {
0615: if (requesters != null) {
0616: logger.debug("Disposing Requesters");
0617: requesters.clear();
0618: logger.debug("Requesters Disposed");
0619: }
0620: }
0621:
0622: /*
0623: * (non-Javadoc)
0624: *
0625: * @see org.mule.api.transport.Connector#isAlive()
0626: */
0627: public boolean isDisposed() {
0628: return disposed.get();
0629: }
0630:
0631: /*
0632: * (non-Javadoc)
0633: *
0634: * @see org.mule.api.transport.Connector#handleException(java.lang.Object,
0635: * java.lang.Throwable)
0636: */
0637: public void handleException(Exception exception) {
0638: if (exceptionListener == null) {
0639: throw new MuleRuntimeException(CoreMessages
0640: .exceptionOnConnectorNotExceptionListener(this
0641: .getName()), exception);
0642: } else {
0643: exceptionListener.exceptionThrown(exception);
0644: }
0645: }
0646:
0647: /*
0648: * (non-Javadoc)
0649: *
0650: * @see org.mule.util.ExceptionListener#onException(java.lang.Throwable)
0651: */
0652: public void exceptionThrown(Exception e) {
0653: handleException(e);
0654: }
0655:
0656: /**
0657: * @return the ExceptionStrategy for this endpoint
0658: * @see ExceptionListener
0659: */
0660: public ExceptionListener getExceptionListener() {
0661: return exceptionListener;
0662: }
0663:
0664: /**
0665: * @param listener the ExceptionStrategy to use with this endpoint
0666: * @see ExceptionListener
0667: */
0668: public void setExceptionListener(ExceptionListener listener) {
0669: exceptionListener = listener;
0670: }
0671:
0672: /**
0673: * @return Returns the dispatcherFactory.
0674: */
0675: public MessageDispatcherFactory getDispatcherFactory() {
0676: return dispatcherFactory;
0677: }
0678:
0679: /**
0680: * @param dispatcherFactory The dispatcherFactory to set.
0681: */
0682: public void setDispatcherFactory(
0683: MessageDispatcherFactory dispatcherFactory) {
0684: KeyedPoolableObjectFactory poolFactory;
0685:
0686: if (dispatcherFactory instanceof KeyedPoolableObjectFactory) {
0687: poolFactory = (KeyedPoolableObjectFactory) dispatcherFactory;
0688: } else {
0689: // need to adapt the factory for use by commons-pool
0690: poolFactory = new KeyedPoolMessageDispatcherFactoryAdapter(
0691: dispatcherFactory);
0692: }
0693:
0694: this .dispatchers.setFactory(poolFactory);
0695:
0696: // we keep a reference to the unadapted factory, otherwise people might end
0697: // up with ClassCastExceptions on downcast to their implementation (sigh)
0698: this .dispatcherFactory = dispatcherFactory;
0699: }
0700:
0701: /**
0702: * @return Returns the requesterFactory.
0703: */
0704: public MessageRequesterFactory getRequesterFactory() {
0705: return requesterFactory;
0706: }
0707:
0708: /**
0709: * @param requesterFactory The requesterFactory to set.
0710: */
0711: public void setRequesterFactory(
0712: MessageRequesterFactory requesterFactory) {
0713: KeyedPoolableObjectFactory poolFactory;
0714:
0715: if (requesterFactory instanceof KeyedPoolableObjectFactory) {
0716: poolFactory = (KeyedPoolableObjectFactory) requesterFactory;
0717: } else {
0718: // need to adapt the factory for use by commons-pool
0719: poolFactory = new KeyedPoolMessageRequesterFactoryAdapter(
0720: requesterFactory);
0721: }
0722:
0723: requesters.setFactory(poolFactory);
0724:
0725: // we keep a reference to the unadapted factory, otherwise people might end
0726: // up with ClassCastExceptions on downcast to their implementation (sigh)
0727: this .requesterFactory = requesterFactory;
0728: }
0729:
0730: /**
0731: * Returns the maximum number of dispatchers that can be concurrently active per
0732: * endpoint.
0733: *
0734: * @return max. number of active dispatchers
0735: */
0736: public int getMaxDispatchersActive() {
0737: return this .dispatchers.getMaxActive();
0738: }
0739:
0740: /**
0741: * Configures the maximum number of dispatchers that can be concurrently active
0742: * per endpoint
0743: *
0744: * @param maxActive max. number of active dispatchers
0745: */
0746: public void setMaxDispatchersActive(int maxActive) {
0747: this .dispatchers.setMaxActive(maxActive);
0748: // adjust maxIdle in tandem to avoid thrashing
0749: this .dispatchers.setMaxIdle(maxActive);
0750: }
0751:
0752: private MessageDispatcher getDispatcher(OutboundEndpoint endpoint)
0753: throws MuleException {
0754: this .checkDisposed();
0755:
0756: if (endpoint == null) {
0757: throw new IllegalArgumentException(
0758: "Endpoint must not be null");
0759: }
0760:
0761: if (!supportsProtocol(endpoint.getConnector().getProtocol())) {
0762: throw new IllegalArgumentException(CoreMessages
0763: .connectorSchemeIncompatibleWithEndpointScheme(
0764: this .getProtocol(),
0765: endpoint.getEndpointURI().toString())
0766: .getMessage());
0767: }
0768:
0769: MessageDispatcher dispatcher = null;
0770: try {
0771: if (logger.isDebugEnabled()) {
0772: logger.debug("Borrowing a dispatcher for endpoint: "
0773: + endpoint.getEndpointURI());
0774: }
0775:
0776: dispatcher = (MessageDispatcher) dispatchers
0777: .borrowObject(endpoint);
0778:
0779: if (logger.isDebugEnabled()) {
0780: logger.debug("Borrowed a dispatcher for endpoint: "
0781: + endpoint.getEndpointURI() + " = "
0782: + dispatcher.toString());
0783: }
0784:
0785: return dispatcher;
0786: } catch (Exception ex) {
0787: throw new ConnectorException(CoreMessages
0788: .connectorCausedError(), this , ex);
0789: } finally {
0790: try {
0791: if (logger.isDebugEnabled()) {
0792: logger.debug("Borrowed dispatcher: "
0793: + ObjectUtils.toString(dispatcher, "null"));
0794: }
0795: } catch (Exception ex) {
0796: throw new ConnectorException(CoreMessages
0797: .connectorCausedError(), this , ex);
0798: }
0799: }
0800: }
0801:
0802: private void returnDispatcher(OutboundEndpoint endpoint,
0803: MessageDispatcher dispatcher) {
0804: if (endpoint != null && dispatcher != null) {
0805: try {
0806: if (logger.isDebugEnabled()) {
0807: logger.debug("Returning dispatcher for endpoint: "
0808: + endpoint.getEndpointURI() + " = "
0809: + dispatcher.toString());
0810: }
0811:
0812: } catch (Exception ex) {
0813: //Logging failed
0814: } finally {
0815: try {
0816: dispatchers.returnObject(endpoint, dispatcher);
0817: } catch (Exception e) {
0818: // TODO MULE-863: What should we really do?
0819: // ignore - if the dispatcher is broken, it will likely get cleaned
0820: // up by the factory
0821: //RM* I think we should at least log this error so give some indication of what is failing
0822: logger
0823: .error(
0824: "Failed to dispose dispatcher for endpoint: "
0825: + endpoint
0826: + ". This will cause a memory leak. Please report to",
0827: e);
0828: }
0829: }
0830: }
0831: }
0832:
0833: /**
0834: * Returns the maximum number of requesters that can be concurrently active per
0835: * endpoint.
0836: *
0837: * @return max. number of active requesters
0838: */
0839: public int getMaxRequestersActive() {
0840: return this .requesters.getMaxActive();
0841: }
0842:
0843: /**
0844: * Configures the maximum number of requesters that can be concurrently active
0845: * per endpoint
0846: *
0847: * @param maxActive max. number of active requesters
0848: */
0849: public void setMaxRequestersActive(int maxActive) {
0850: this .requesters.setMaxActive(maxActive);
0851: // adjust maxIdle in tandem to avoid thrashing
0852: this .requesters.setMaxIdle(maxActive);
0853: }
0854:
0855: private MessageRequester getRequester(InboundEndpoint endpoint)
0856: throws MuleException {
0857: this .checkDisposed();
0858:
0859: if (endpoint == null) {
0860: throw new IllegalArgumentException(
0861: "Endpoint must not be null");
0862: }
0863:
0864: if (!supportsProtocol(endpoint.getConnector().getProtocol())) {
0865: throw new IllegalArgumentException(CoreMessages
0866: .connectorSchemeIncompatibleWithEndpointScheme(
0867: this .getProtocol(),
0868: endpoint.getEndpointURI().toString())
0869: .getMessage());
0870: }
0871:
0872: MessageRequester requester = null;
0873: try {
0874: if (logger.isDebugEnabled()) {
0875: logger.debug("Borrowing a requester for endpoint: "
0876: + endpoint.getEndpointURI());
0877: }
0878:
0879: requester = (MessageRequester) requesters
0880: .borrowObject(endpoint);
0881:
0882: if (logger.isDebugEnabled()) {
0883: logger.debug("Borrowed a requester for endpoint: "
0884: + endpoint.getEndpointURI() + " = "
0885: + requester.toString());
0886: }
0887:
0888: return requester;
0889: } catch (Exception ex) {
0890: throw new ConnectorException(CoreMessages
0891: .connectorCausedError(), this , ex);
0892: } finally {
0893: try {
0894: if (logger.isDebugEnabled()) {
0895: logger.debug("Borrowed requester: "
0896: + ObjectUtils.toString(requester, "null"));
0897: }
0898: } catch (Exception ex) {
0899: throw new ConnectorException(CoreMessages
0900: .connectorCausedError(), this , ex);
0901: }
0902: }
0903: }
0904:
0905: private void returnRequester(InboundEndpoint endpoint,
0906: MessageRequester requester) {
0907: if (endpoint != null && requester != null) {
0908: try {
0909: if (logger.isDebugEnabled()) {
0910: logger.debug("Returning requester for endpoint: "
0911: + endpoint.getEndpointURI() + " = "
0912: + requester.toString());
0913: }
0914:
0915: } catch (Exception ex) {
0916: //Logging failed
0917: } finally {
0918: try {
0919: requesters.returnObject(endpoint, requester);
0920: } catch (Exception e) {
0921: // TODO MULE-863: What should we really do?
0922: // ignore - if the requester is broken, it will likely get cleaned
0923: // up by the factory
0924: //RM* I think we should at least log this error so give some indication of what is failing
0925: logger
0926: .error(
0927: "Failed to dispose requester for endpoint: "
0928: + endpoint
0929: + ". This will cause a memory leak. Please report to",
0930: e);
0931: }
0932: }
0933: }
0934: }
0935:
0936: protected void checkDisposed() throws DisposeException {
0937: if (this .isDisposed()) {
0938: throw new DisposeException(CoreMessages
0939: .cannotUseDisposedConnector(), this );
0940: }
0941: }
0942:
0943: public MessageReceiver registerListener(Service service,
0944: InboundEndpoint endpoint) throws Exception {
0945: if (endpoint == null) {
0946: throw new IllegalArgumentException(
0947: "The endpoint cannot be null when registering a listener");
0948: }
0949:
0950: if (service == null) {
0951: throw new IllegalArgumentException(
0952: "The service cannot be null when registering a listener");
0953: }
0954:
0955: EndpointURI endpointUri = endpoint.getEndpointURI();
0956: if (endpointUri == null) {
0957: throw new ConnectorException(CoreMessages
0958: .endpointIsNullForListener(), this );
0959: }
0960:
0961: logger.info("Registering listener: " + service.getName()
0962: + " on endpointUri: " + endpointUri.toString());
0963:
0964: if (getReceiver(service, endpoint) != null) {
0965: throw new ConnectorException(CoreMessages
0966: .listenerAlreadyRegistered(endpointUri), this );
0967: }
0968:
0969: MessageReceiver receiver = createReceiver(service, endpoint);
0970: Object receiverKey = getReceiverKey(service, endpoint);
0971: receiver.setReceiverKey(receiverKey.toString());
0972: // Since we're managing the creation we also need to initialise
0973: receiver.initialise();
0974: receivers.put(receiverKey, receiver);
0975: // receivers.put(getReceiverKey(service, endpoint), receiver);
0976:
0977: return receiver;
0978: }
0979:
0980: /**
0981: * The method determines the key used to store the receiver against.
0982: *
0983: * @param service the service for which the endpoint is being registered
0984: * @param endpoint the endpoint being registered for the service
0985: * @return the key to store the newly created receiver against
0986: */
0987: protected Object getReceiverKey(Service service,
0988: InboundEndpoint endpoint) {
0989: return StringUtils.defaultIfEmpty(endpoint.getEndpointURI()
0990: .getFilterAddress(), endpoint.getEndpointURI()
0991: .getAddress());
0992: }
0993:
0994: public final void unregisterListener(Service service,
0995: InboundEndpoint endpoint) throws Exception {
0996: if (service == null) {
0997: throw new IllegalArgumentException(
0998: "The service must not be null when you unregister a listener");
0999: }
1000:
1001: if (endpoint == null) {
1002: throw new IllegalArgumentException(
1003: "The endpoint must not be null when you unregister a listener");
1004: }
1005:
1006: EndpointURI endpointUri = endpoint.getEndpointURI();
1007: if (endpointUri == null) {
1008: throw new IllegalArgumentException(
1009: "The endpointUri must not be null when you unregister a listener");
1010: }
1011:
1012: if (logger.isInfoEnabled()) {
1013: logger.info("Removing listener on endpointUri: "
1014: + endpointUri);
1015: }
1016:
1017: if (receivers != null && !receivers.isEmpty()) {
1018: MessageReceiver receiver = (MessageReceiver) receivers
1019: .remove(getReceiverKey(service, endpoint));
1020: if (receiver != null) {
1021: destroyReceiver(receiver, endpoint);
1022: receiver.dispose();
1023: }
1024: }
1025: }
1026:
1027: /**
1028: * Getter for property 'dispatcherThreadingProfile'.
1029: *
1030: * @return Value for property 'dispatcherThreadingProfile'.
1031: */
1032: public ThreadingProfile getDispatcherThreadingProfile() {
1033: if (dispatcherThreadingProfile == null && muleContext != null) {
1034: dispatcherThreadingProfile = muleContext
1035: .getDefaultMessageDispatcherThreadingProfile();
1036: }
1037: return dispatcherThreadingProfile;
1038: }
1039:
1040: /**
1041: * Setter for property 'dispatcherThreadingProfile'.
1042: *
1043: * @param dispatcherThreadingProfile Value to set for property
1044: * 'dispatcherThreadingProfile'.
1045: */
1046: public void setDispatcherThreadingProfile(
1047: ThreadingProfile dispatcherThreadingProfile) {
1048: this .dispatcherThreadingProfile = dispatcherThreadingProfile;
1049: }
1050:
1051: /**
1052: * Getter for property 'requesterThreadingProfile'.
1053: *
1054: * @return Value for property 'requesterThreadingProfile'.
1055: */
1056: public ThreadingProfile getRequesterThreadingProfile() {
1057: if (requesterThreadingProfile == null && muleContext != null) {
1058: requesterThreadingProfile = muleContext
1059: .getDefaultMessageRequesterThreadingProfile();
1060: }
1061: return requesterThreadingProfile;
1062: }
1063:
1064: /**
1065: * Setter for property 'requesterThreadingProfile'.
1066: *
1067: * @param requesterThreadingProfile Value to set for property
1068: * 'requesterThreadingProfile'.
1069: */
1070: public void setRequesterThreadingProfile(
1071: ThreadingProfile requesterThreadingProfile) {
1072: this .requesterThreadingProfile = requesterThreadingProfile;
1073: }
1074:
1075: /**
1076: * Getter for property 'receiverThreadingProfile'.
1077: *
1078: * @return Value for property 'receiverThreadingProfile'.
1079: */
1080: public ThreadingProfile getReceiverThreadingProfile() {
1081: if (receiverThreadingProfile == null && muleContext != null) {
1082: receiverThreadingProfile = muleContext
1083: .getDefaultMessageReceiverThreadingProfile();
1084: }
1085: return receiverThreadingProfile;
1086: }
1087:
1088: /**
1089: * Setter for property 'receiverThreadingProfile'.
1090: *
1091: * @param receiverThreadingProfile Value to set for property
1092: * 'receiverThreadingProfile'.
1093: */
1094: public void setReceiverThreadingProfile(
1095: ThreadingProfile receiverThreadingProfile) {
1096: this .receiverThreadingProfile = receiverThreadingProfile;
1097: }
1098:
1099: public void destroyReceiver(MessageReceiver receiver,
1100: InboundEndpoint endpoint) throws Exception {
1101: receiver.dispose();
1102: }
1103:
1104: protected abstract void doInitialise()
1105: throws InitialisationException;
1106:
1107: /**
1108: * Template method to perform any work when destroying the connectoe
1109: */
1110: protected abstract void doDispose();
1111:
1112: /**
1113: * Template method to perform any work when starting the connectoe
1114: *
1115: * @throws MuleException if the method fails
1116: */
1117: protected abstract void doStart() throws MuleException;
1118:
1119: /**
1120: * Template method to perform any work when stopping the connectoe
1121: *
1122: * @throws MuleException if the method fails
1123: */
1124: protected abstract void doStop() throws MuleException;
1125:
1126: public List getDefaultInboundTransformers() {
1127: if (serviceDescriptor == null) {
1128: throw new RuntimeException(
1129: "serviceDescriptor not initialized");
1130: }
1131: return TransformerUtils
1132: .getDefaultInboundTransformers(serviceDescriptor);
1133: }
1134:
1135: public List getDefaultResponseTransformers() {
1136: if (serviceDescriptor == null) {
1137: throw new RuntimeException(
1138: "serviceDescriptor not initialized");
1139: }
1140: return TransformerUtils
1141: .getDefaultResponseTransformers(serviceDescriptor);
1142: }
1143:
1144: public List getDefaultOutboundTransformers() {
1145: if (serviceDescriptor == null) {
1146: throw new RuntimeException(
1147: "serviceDescriptor not initialized");
1148: }
1149: return TransformerUtils
1150: .getDefaultOutboundTransformers(serviceDescriptor);
1151: }
1152:
1153: /**
1154: * Getter for property 'replyToHandler'.
1155: *
1156: * @return Value for property 'replyToHandler'.
1157: */
1158: public ReplyToHandler getReplyToHandler() {
1159: return new DefaultReplyToHandler(
1160: getDefaultResponseTransformers());
1161: }
1162:
1163: /**
1164: * Fires a server notification to all registered listeners
1165: *
1166: * @param notification the notification to fire.
1167: */
1168: public void fireNotification(ServerNotification notification) {
1169: cachedNotificationHandler.fireNotification(notification);
1170: }
1171:
1172: /**
1173: * Getter for property 'connectionStrategy'.
1174: *
1175: * @return Value for property 'connectionStrategy'.
1176: */
1177: //TODO RM* REMOVE
1178: public ConnectionStrategy getConnectionStrategy() {
1179: // not happy with this but each receiver needs its own instance
1180: // of the connection strategy and using a factory just introduces extra
1181: // implementation
1182: try {
1183: return (ConnectionStrategy) BeanUtils
1184: .cloneBean(connectionStrategy);
1185: } catch (Exception e) {
1186: throw new MuleRuntimeException(CoreMessages
1187: .failedToClone("connectionStrategy"), e);
1188: }
1189: }
1190:
1191: /**
1192: * Setter for property 'connectionStrategy'.
1193: *
1194: * @param connectionStrategy Value to set for property 'connectionStrategy'.
1195: */
1196: public void setConnectionStrategy(
1197: ConnectionStrategy connectionStrategy) {
1198: this .connectionStrategy = connectionStrategy;
1199: }
1200:
1201: /** {@inheritDoc} */
1202: public boolean isDisposing() {
1203: return disposing.get();
1204: }
1205:
1206: public boolean isRemoteSyncEnabled() {
1207: return false;
1208: }
1209:
1210: public boolean isSyncEnabled(String protocol) {
1211: return false;
1212: }
1213:
1214: public MessageReceiver getReceiver(Service service,
1215: InboundEndpoint endpoint) {
1216: if (receivers != null) {
1217: Object key = getReceiverKey(service, endpoint);
1218: if (key != null) {
1219: return (MessageReceiver) receivers.get(key);
1220: } else {
1221: throw new RuntimeException(
1222: "getReceiverKey() returned a null key");
1223: }
1224: } else {
1225: throw new RuntimeException(
1226: "Connector has not been initialized.");
1227: }
1228: }
1229:
1230: /**
1231: * Getter for property 'receivers'.
1232: *
1233: * @return Value for property 'receivers'.
1234: */
1235: public Map getReceivers() {
1236: return Collections.unmodifiableMap(receivers);
1237: }
1238:
1239: public MessageReceiver lookupReceiver(String key) {
1240: if (key != null) {
1241: return (MessageReceiver) receivers.get(key);
1242: } else {
1243: throw new IllegalArgumentException(
1244: "Receiver key must not be null");
1245: }
1246: }
1247:
1248: public MessageReceiver[] getReceivers(String wildcardExpression) {
1249: WildcardFilter filter = new WildcardFilter(wildcardExpression);
1250: filter.setCaseSensitive(false);
1251:
1252: List found = new ArrayList();
1253:
1254: for (Iterator iterator = receivers.entrySet().iterator(); iterator
1255: .hasNext();) {
1256: Map.Entry e = (Map.Entry) iterator.next();
1257: if (filter.accept(e.getKey())) {
1258: found.add(e.getValue());
1259: }
1260: }
1261:
1262: return (MessageReceiver[]) CollectionUtils
1263: .toArrayOfComponentType(found, MessageReceiver.class);
1264: }
1265:
1266: public void connect() throws Exception {
1267: this .checkDisposed();
1268:
1269: if (connected.get()) {
1270: return;
1271: }
1272:
1273: /*
1274: Until the recursive startConnector() -> connect() -> doConnect() -> connect()
1275: calls are unwound between a connector and connection strategy, this call has
1276: to be here, and not below (commented out currently). Otherwise, e.g. WebspherMQ
1277: goes into an endless reconnect thrashing loop, see MULE-1150 for more details.
1278: */
1279: try {
1280: if (connecting.get()) {
1281: this .doConnect();
1282: }
1283: if (connecting.compareAndSet(false, true)) {
1284: if (logger.isDebugEnabled()) {
1285: logger.debug("Connecting: " + this );
1286: }
1287:
1288: connectionStrategy.connect(this );
1289:
1290: logger.info("Connected: " + getConnectionDescription());
1291: // This method calls itself so the connecting flag is set first, then
1292: // the connection is made on the second call
1293: return;
1294: }
1295:
1296: // see the explanation above
1297: //this.doConnect();
1298: connected.set(true);
1299: connecting.set(false);
1300:
1301: this .fireNotification(new ConnectionNotification(this ,
1302: getConnectEventId(),
1303: ConnectionNotification.CONNECTION_CONNECTED));
1304: } catch (Exception e) {
1305: connected.set(false);
1306: connecting.set(false);
1307:
1308: this .fireNotification(new ConnectionNotification(this ,
1309: getConnectEventId(),
1310: ConnectionNotification.CONNECTION_FAILED));
1311:
1312: if (e instanceof ConnectException
1313: || e instanceof FatalConnectException) {
1314: // rethrow
1315: throw e;
1316: } else {
1317: throw new ConnectException(e, this );
1318: }
1319: }
1320:
1321: if (startOnConnect.get()) {
1322: this .start();
1323: }
1324: //TODO RM*. If the connection strategy is called on the receivers, the connector strategy gets called too,
1325: //to ensure its connected. Therefore the connect method on the connector needs to be idempotent and not try
1326: //and connect dispatchers or receivers
1327:
1328: // else
1329: // {
1330: // for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)
1331: // {
1332: // MessageReceiver receiver = (MessageReceiver)iterator.next();
1333: // if (logger.isDebugEnabled())
1334: // {
1335: // logger.debug("Connecting receiver on endpoint: "
1336: // + receiver.getEndpoint().getEndpointURI());
1337: // }
1338: // receiver.connect();
1339: // }
1340: // }
1341: }
1342:
1343: public void disconnect() throws Exception {
1344: startOnConnect.set(this .isStarted());
1345:
1346: this .fireNotification(new ConnectionNotification(this ,
1347: getConnectEventId(),
1348: ConnectionNotification.CONNECTION_DISCONNECTED));
1349:
1350: connected.set(false);
1351:
1352: try {
1353: this .doDisconnect();
1354: } finally {
1355: this .stop();
1356: }
1357:
1358: logger.info("Disconnected: " + this .getConnectionDescription());
1359: }
1360:
1361: public String getConnectionDescription() {
1362: return this .toString();
1363: }
1364:
1365: public final boolean isConnected() {
1366: return connected.get();
1367: }
1368:
1369: /**
1370: * Template method where any connections should be made for the connector
1371: *
1372: * @throws Exception
1373: */
1374: protected abstract void doConnect() throws Exception;
1375:
1376: /**
1377: * Template method where any connected resources used by the connector should be
1378: * disconnected
1379: *
1380: * @throws Exception
1381: */
1382: protected abstract void doDisconnect() throws Exception;
1383:
1384: /**
1385: * The resource id used when firing ConnectEvents from this connector
1386: *
1387: * @return the resource id used when firing ConnectEvents from this connector
1388: */
1389: protected String getConnectEventId() {
1390: return getName();
1391: }
1392:
1393: /**
1394: * For better throughput when using TransactedMessageReceivers this will enable a
1395: * number of concurrent receivers, based on the value returned by
1396: * {@link #getNumberOfConcurrentTransactedReceivers()}. This property is used by
1397: * transports that support transactions, specifically receivers that extend the
1398: * TransactedPollingMessageReceiver.
1399: *
1400: * @return true if multiple receivers will be enabled for this connection
1401: */
1402: public boolean isCreateMultipleTransactedReceivers() {
1403: return createMultipleTransactedReceivers;
1404: }
1405:
1406: /**
1407: * @see {@link #isCreateMultipleTransactedReceivers()}
1408: * @param createMultipleTransactedReceivers if true, multiple receivers will be
1409: * created for this connection
1410: */
1411: public void setCreateMultipleTransactedReceivers(
1412: boolean createMultipleTransactedReceivers) {
1413: this .createMultipleTransactedReceivers = createMultipleTransactedReceivers;
1414: }
1415:
1416: /**
1417: * Returns the number of concurrent receivers that will be launched when
1418: * {@link #isCreateMultipleTransactedReceivers()} returns <code>true</code>.
1419: *
1420: * @see #DEFAULT_NUM_CONCURRENT_TX_RECEIVERS
1421: */
1422: public int getNumberOfConcurrentTransactedReceivers() {
1423: return numberOfConcurrentTransactedReceivers;
1424: }
1425:
1426: /**
1427: * @see {@link #getNumberOfConcurrentTransactedReceivers()}
1428: * @param count the number of concurrent transacted receivers to start
1429: */
1430: public void setNumberOfConcurrentTransactedReceivers(int count) {
1431: numberOfConcurrentTransactedReceivers = count;
1432: }
1433:
1434: public void setDynamicNotification(boolean dynamic) {
1435: dynamicNotification = dynamic;
1436: }
1437:
1438: protected void updateCachedNotificationHandler() {
1439: if (null != muleContext) {
1440: if (dynamicNotification) {
1441: cachedNotificationHandler = muleContext
1442: .getNotificationManager();
1443: } else {
1444: cachedNotificationHandler = new OptimisedNotificationHandler(
1445: muleContext.getNotificationManager(),
1446: MessageNotification.class);
1447: }
1448: }
1449: }
1450:
1451: public boolean isEnableMessageEvents() {
1452: return cachedNotificationHandler
1453: .isNotificationEnabled(MessageNotification.class);
1454: }
1455:
1456: /**
1457: * Registers other protocols 'understood' by this connector. These must contain
1458: * scheme meta info. Any protocol registered must begin with the protocol of this
1459: * connector, i.e. If the connector is axis the protocol for jms over axis will
1460: * be axis:jms. Here, 'axis' is the scheme meta info and 'jms' is the protocol.
1461: * If the protocol argument does not start with the connector's protocol, it will
1462: * be appended.
1463: *
1464: * @param protocol the supported protocol to register
1465: */
1466: public void registerSupportedProtocol(String protocol) {
1467: protocol = protocol.toLowerCase();
1468: if (protocol.startsWith(getProtocol().toLowerCase())) {
1469: registerSupportedProtocolWithoutPrefix(protocol);
1470: } else {
1471: supportedProtocols.add(getProtocol().toLowerCase() + ":"
1472: + protocol);
1473: }
1474: }
1475:
1476: /**
1477: * Registers other protocols 'understood' by this connector. These must contain
1478: * scheme meta info. Unlike the {@link #registerSupportedProtocol(String)} method,
1479: * this allows you to register protocols that are not prefixed with the connector
1480: * protocol. This is useful where you use a Service Finder to discover which
1481: * Transport implementation to use. For example the 'wsdl' transport is a generic
1482: * 'finder' transport that will use Axis, Xfire or Glue to create the WSDL
1483: * client. These transport protocols would be wsdl-axis, wsdl-xfire and
1484: * wsdl-glue, but they can all support 'wsdl' protocol too.
1485: *
1486: * @param protocol the supported protocol to register
1487: */
1488: protected void registerSupportedProtocolWithoutPrefix(
1489: String protocol) {
1490: supportedProtocols.add(protocol.toLowerCase());
1491: }
1492:
1493: public void unregisterSupportedProtocol(String protocol) {
1494: protocol = protocol.toLowerCase();
1495: if (protocol.startsWith(getProtocol().toLowerCase())) {
1496: supportedProtocols.remove(protocol);
1497: } else {
1498: supportedProtocols.remove(getProtocol().toLowerCase() + ":"
1499: + protocol);
1500: }
1501: }
1502:
1503: /**
1504: * @return true if the protocol is supported by this connector.
1505: */
1506: public boolean supportsProtocol(String protocol) {
1507: return supportedProtocols.contains(protocol.toLowerCase());
1508: }
1509:
1510: /**
1511: * Returns an unmodifiable list of the protocols supported by this connector
1512: *
1513: * @return an unmodifiable list of the protocols supported by this connector
1514: */
1515: public List getSupportedProtocols() {
1516: return Collections.unmodifiableList(supportedProtocols);
1517: }
1518:
1519: /**
1520: * Sets A list of protocols that the connector can accept
1521: *
1522: * @param supportedProtocols
1523: */
1524: public void setSupportedProtocols(List supportedProtocols) {
1525: for (Iterator iterator = supportedProtocols.iterator(); iterator
1526: .hasNext();) {
1527: String s = (String) iterator.next();
1528: registerSupportedProtocol(s);
1529: }
1530: }
1531:
1532: /**
1533: * Returns a work manager for message receivers.
1534: */
1535: protected WorkManager getReceiverWorkManager(String receiverName)
1536: throws MuleException {
1537: return (WorkManager) receiverWorkManager.get();
1538: }
1539:
1540: /**
1541: * Returns a work manager for message dispatchers.
1542: *
1543: * @throws MuleException in case of error
1544: */
1545: protected WorkManager getDispatcherWorkManager()
1546: throws MuleException {
1547: return (WorkManager) dispatcherWorkManager.get();
1548: }
1549:
1550: /**
1551: * Returns a work manager for message requesters.
1552: *
1553: * @throws MuleException in case of error
1554: */
1555: protected WorkManager getRequesterWorkManager()
1556: throws MuleException {
1557: return (WorkManager) requesterWorkManager.get();
1558: }
1559:
1560: /**
1561: * Returns a Scheduler service for periodic tasks, currently limited to internal
1562: * use. Note: getScheduler() currently conflicts with the same method in the
1563: * Quartz transport
1564: */
1565: public ScheduledExecutorService getScheduler() {
1566: if (scheduler.get() == null) {
1567: ThreadFactory threadFactory = new NamedThreadFactory(this
1568: .getName()
1569: + ".scheduler");
1570: ScheduledThreadPoolExecutor newExecutor = new ScheduledThreadPoolExecutor(
1571: 4, threadFactory);
1572: newExecutor
1573: .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
1574: newExecutor.setKeepAliveTime(this
1575: .getReceiverThreadingProfile().getThreadTTL(),
1576: TimeUnit.MILLISECONDS);
1577: newExecutor.allowCoreThreadTimeOut(true);
1578:
1579: if (!scheduler.compareAndSet(null, newExecutor)) {
1580: // someone else was faster, ditch our copy.
1581: newExecutor.shutdown();
1582: }
1583: }
1584:
1585: return (ScheduledExecutorService) scheduler.get();
1586: }
1587:
1588: /**
1589: * Getter for property 'sessionHandler'.
1590: *
1591: * @return Value for property 'sessionHandler'.
1592: */
1593: public SessionHandler getSessionHandler() {
1594: return sessionHandler;
1595: }
1596:
1597: /**
1598: * Setter for property 'sessionHandler'.
1599: *
1600: * @param sessionHandler Value to set for property 'sessionHandler'.
1601: */
1602: public void setSessionHandler(SessionHandler sessionHandler) {
1603: this .sessionHandler = sessionHandler;
1604: }
1605:
1606: public void workAccepted(WorkEvent event) {
1607: this .handleWorkException(event, "workAccepted");
1608: }
1609:
1610: public void workRejected(WorkEvent event) {
1611: this .handleWorkException(event, "workRejected");
1612: }
1613:
1614: public void workStarted(WorkEvent event) {
1615: this .handleWorkException(event, "workStarted");
1616: }
1617:
1618: public void workCompleted(WorkEvent event) {
1619: this .handleWorkException(event, "workCompleted");
1620: }
1621:
1622: protected void handleWorkException(WorkEvent event, String type) {
1623: if (event == null) {
1624: return;
1625: }
1626:
1627: Throwable e = event.getException();
1628:
1629: if (e == null) {
1630: return;
1631: }
1632:
1633: if (e.getCause() != null) {
1634: e = e.getCause();
1635: }
1636:
1637: logger.error("Work caused exception on '" + type
1638: + "'. Work being executed was: "
1639: + event.getWork().toString());
1640:
1641: if (e instanceof Exception) {
1642: this .handleException((Exception) e);
1643: } else {
1644: throw new MuleRuntimeException(CoreMessages
1645: .connectorCausedError(this .getName()), e);
1646: }
1647: }
1648:
1649: // TODO the following methods should probably be lifecycle-enabled;
1650: // for now they are only stubs to get the refactoring going.
1651:
1652: public void dispatch(OutboundEndpoint endpoint, MuleEvent event)
1653: throws DispatchException {
1654: MessageDispatcher dispatcher = null;
1655:
1656: try {
1657: dispatcher = this .getDispatcher(endpoint);
1658: dispatcher.dispatch(event);
1659: } catch (DispatchException dex) {
1660: throw dex;
1661: } catch (MuleException ex) {
1662: throw new DispatchException(event.getMessage(), endpoint,
1663: ex);
1664: } finally {
1665: this .returnDispatcher(endpoint, dispatcher);
1666: }
1667: }
1668:
1669: /**
1670: * This method will return the dispatcher to the pool or, if the payload is an inputstream,
1671: * replace the payload with a new DelegatingInputStream which returns the dispatcher to
1672: * the pool when the stream is closed.
1673: *
1674: * @param endpoint
1675: * @param dispatcher
1676: * @param result
1677: */
1678: protected void setupDispatchReturn(final OutboundEndpoint endpoint,
1679: final MessageDispatcher dispatcher, MuleMessage result) {
1680: if (result != null
1681: && result.getPayload() instanceof InputStream) {
1682: DelegatingInputStream is = new DelegatingInputStream(
1683: (InputStream) result.getPayload()) {
1684: public void close() throws IOException {
1685: try {
1686: super .close();
1687: } finally {
1688: returnDispatcher(endpoint, dispatcher);
1689: }
1690: }
1691: };
1692: result.setPayload(is);
1693: } else {
1694:
1695: this .returnDispatcher(endpoint, dispatcher);
1696: }
1697: }
1698:
1699: public MuleMessage request(String uri, long timeout)
1700: throws Exception {
1701: return request(getMuleContext().getRegistry()
1702: .lookupEndpointFactory().getInboundEndpoint(uri),
1703: timeout);
1704: }
1705:
1706: public MuleMessage request(InboundEndpoint endpoint, long timeout)
1707: throws Exception {
1708: MessageRequester requester = null;
1709: MuleMessage result = null;
1710: try {
1711: requester = this .getRequester(endpoint);
1712: result = requester.request(timeout);
1713: return result;
1714: } finally {
1715: setupRequestReturn(endpoint, requester, result);
1716: }
1717: }
1718:
1719: /**
1720: * This method will return the requester to the pool or, if the payload is an inputstream,
1721: * replace the payload with a new DelegatingInputStream which returns the requester to
1722: * the pool when the stream is closed.
1723: *
1724: * @param endpoint
1725: * @param requester
1726: * @param result
1727: */
1728: protected void setupRequestReturn(final InboundEndpoint endpoint,
1729: final MessageRequester requester, MuleMessage result) {
1730: if (result != null
1731: && result.getPayload() instanceof InputStream) {
1732: DelegatingInputStream is = new DelegatingInputStream(
1733: (InputStream) result.getPayload()) {
1734: public void close() throws IOException {
1735: try {
1736: super .close();
1737: } finally {
1738: returnRequester(endpoint, requester);
1739: }
1740: }
1741: };
1742: result.setPayload(is);
1743: } else {
1744:
1745: this .returnRequester(endpoint, requester);
1746: }
1747: }
1748:
1749: public MuleMessage send(OutboundEndpoint endpoint, MuleEvent event)
1750: throws DispatchException {
1751: MessageDispatcher dispatcher = null;
1752:
1753: try {
1754: dispatcher = this .getDispatcher(endpoint);
1755: return dispatcher.send(event);
1756: } catch (DispatchException dex) {
1757: throw dex;
1758: } catch (MuleException ex) {
1759: throw new DispatchException(event.getMessage(), endpoint,
1760: ex);
1761: } finally {
1762: this .returnDispatcher(endpoint, dispatcher);
1763: }
1764: }
1765:
1766: // -------- Methods from the removed AbstractServiceEnabled Connector
1767:
1768: /**
1769: * When this connector is created via the
1770: * {@link org.mule.transport.service.TransportFactory} the endpoint used to
1771: * determine the connector type is passed to this method so that any properties
1772: * set on the endpoint that can be used to initialise the connector are made
1773: * available.
1774: *
1775: * @param endpointUri the {@link EndpointURI} use to create this connector
1776: * @throws InitialisationException If there are any problems with the
1777: * configuration set on the Endpoint or if another exception is
1778: * thrown it is wrapped in an InitialisationException.
1779: */
1780: public void initialiseFromUrl(EndpointURI endpointUri)
1781: throws InitialisationException {
1782: if (!supportsProtocol(endpointUri.getFullScheme())) {
1783: throw new InitialisationException(CoreMessages
1784: .schemeNotCompatibleWithConnector(endpointUri
1785: .getFullScheme(), this .getClass()), this );
1786: }
1787: Properties props = new Properties();
1788: props.putAll(endpointUri.getParams());
1789: // auto set username and password
1790: if (endpointUri.getUserInfo() != null) {
1791: props.setProperty("username", endpointUri.getUser());
1792: String passwd = endpointUri.getPassword();
1793: if (passwd != null) {
1794: props.setProperty("password", passwd);
1795: }
1796: }
1797: String host = endpointUri.getHost();
1798: if (host != null) {
1799: props.setProperty("hostname", host);
1800: props.setProperty("host", host);
1801: }
1802: if (endpointUri.getPort() > -1) {
1803: props.setProperty("port", String.valueOf(endpointUri
1804: .getPort()));
1805: }
1806:
1807: org.mule.util.BeanUtils.populateWithoutFail(this , props, true);
1808:
1809: setName(ObjectNameHelper.getConnectorName(this ));
1810: //initialise();
1811: }
1812:
1813: /**
1814: * Initialises this connector from its {@link TransportServiceDescriptor} This
1815: * will be called before the {@link #doInitialise()} method is called.
1816: *
1817: * @throws InitialisationException InitialisationException If there are any
1818: * problems with the configuration or if another exception is thrown
1819: * it is wrapped in an InitialisationException.
1820: */
1821: protected synchronized void initFromServiceDescriptor()
1822: throws InitialisationException {
1823: try {
1824: serviceDescriptor = (TransportServiceDescriptor) RegistryContext
1825: .getRegistry()
1826: .lookupServiceDescriptor(
1827: ServiceDescriptorFactory.PROVIDER_SERVICE_TYPE,
1828: getProtocol().toLowerCase(),
1829: serviceOverrides);
1830: if (serviceDescriptor == null) {
1831: throw new ServiceException(CoreMessages
1832: .noServiceTransportDescriptor(getProtocol()));
1833: }
1834:
1835: if (logger.isDebugEnabled()) {
1836: logger
1837: .debug("Loading DispatcherFactory for connector: "
1838: + getName()
1839: + " ("
1840: + getClass().getName() + ")");
1841: }
1842:
1843: MessageDispatcherFactory df = serviceDescriptor
1844: .createDispatcherFactory();
1845: if (df != null) {
1846: this .setDispatcherFactory(df);
1847: } else if (logger.isDebugEnabled()) {
1848: logger.debug("Transport '" + getProtocol()
1849: + "' will not support outbound endpoints: ");
1850: }
1851:
1852: if (logger.isDebugEnabled()) {
1853: logger
1854: .debug("Loading RequesterFactory for connector: "
1855: + getName()
1856: + " ("
1857: + getClass().getName() + ")");
1858: }
1859:
1860: MessageRequesterFactory rf = serviceDescriptor
1861: .createRequesterFactory();
1862: if (rf != null) {
1863: this .setRequesterFactory(rf);
1864: } else if (logger.isDebugEnabled()) {
1865: logger.debug("Transport '" + getProtocol()
1866: + "' will not support requests: ");
1867: }
1868:
1869: sessionHandler = serviceDescriptor.createSessionHandler();
1870:
1871: // TODO Do we still need to support this for 2.x?
1872: // Set any manager default properties for the connector. These are set on
1873: // the Manager with a protocol e.g. jms.specification=1.1
1874: // This provides a really convenient way to set properties on an object
1875: // from unit tests
1876: // Map props = new HashMap();
1877: // PropertiesUtils.getPropertiesWithPrefix(muleContext.getRegistry().lookupProperties(), getProtocol()
1878: // .toLowerCase(), props);
1879: // if (props.size() > 0)
1880: // {
1881: // props = PropertiesUtils.removeNamespaces(props);
1882: // org.mule.util.BeanUtils.populateWithoutFail(this, props, true);
1883: // }
1884: } catch (Exception e) {
1885: throw new InitialisationException(e, this );
1886: }
1887: }
1888:
1889: /**
1890: * Get the {@link TransportServiceDescriptor} for this connector. This will be
1891: * null if the connector was created by the developer. To create a connector the
1892: * proper way the developer should use the {@link TransportFactory} and pass in
1893: * an endpoint.
1894: *
1895: * @return the {@link TransportServiceDescriptor} for this connector
1896: */
1897: protected TransportServiceDescriptor getServiceDescriptor() {
1898: if (serviceDescriptor == null) {
1899: throw new IllegalStateException(
1900: "This connector has not yet been initialised: "
1901: + name);
1902: }
1903: return serviceDescriptor;
1904: }
1905:
1906: /**
1907: * Create a Message receiver for this connector
1908: *
1909: * @param service the service that will receive events from this receiver,
1910: * the listener
1911: * @param endpoint the endpoint that defies this inbound communication
1912: * @return an instance of the message receiver defined in this connectors'
1913: * {@link org.mule.transport.service.TransportServiceDescriptor}
1914: * initialised using the service and endpoint.
1915: * @throws Exception if there is a problem creating the receiver. This exception
1916: * really depends on the underlying transport, thus any exception
1917: * could be thrown
1918: */
1919: protected MessageReceiver createReceiver(Service service,
1920: InboundEndpoint endpoint) throws Exception {
1921: return getServiceDescriptor().createMessageReceiver(this ,
1922: service, endpoint);
1923: }
1924:
1925: /**
1926: * Gets a <code>MessageAdapter</code> for the endpoint for the given message
1927: * (data)
1928: *
1929: * @param message the data with which to initialise the
1930: * <code>MessageAdapter</code>
1931: * @return the <code>MessageAdapter</code> for the endpoint
1932: * @throws org.mule.api.MessagingException if the message parameter is not
1933: * supported
1934: * @see org.mule.api.transport.MessageAdapter
1935: */
1936: public MessageAdapter getMessageAdapter(Object message)
1937: throws MessagingException {
1938: try {
1939: return serviceDescriptor.createMessageAdapter(message);
1940: } catch (TransportServiceException e) {
1941: throw new MessagingException(CoreMessages
1942: .failedToCreate("Message Adapter"), message, e);
1943: }
1944: }
1945:
1946: /**
1947: * A map of fully qualified class names that should override those in the
1948: * connectors' service descriptor This map will be null if there are no overrides
1949: *
1950: * @return a map of override values or null
1951: */
1952: public Map getServiceOverrides() {
1953: return serviceOverrides;
1954: }
1955:
1956: /**
1957: * Set the Service overrides on this connector.
1958: *
1959: * @param serviceOverrides the override values to use
1960: */
1961: public void setServiceOverrides(Map serviceOverrides) {
1962: this .serviceOverrides = new Properties();
1963: this .serviceOverrides.putAll(serviceOverrides);
1964: }
1965:
1966: /**
1967: * Will get the output stream for this type of transport. Typically this
1968: * will be called only when Streaming is being used on an outbound endpoint.
1969: * If Streaming is not supported by this transport an {@link UnsupportedOperationException}
1970: * is thrown. Note that the stream MUST release resources on close. For help doing so, see
1971: * {@link org.mule.model.streaming.CallbackOutputStream}.
1972: *
1973: * @param endpoint the endpoint that releates to this Dispatcher
1974: * @param message the current message being processed
1975: * @return the output stream to use for this request
1976: * @throws MuleException in case of any error
1977: */
1978: public OutputStream getOutputStream(OutboundEndpoint endpoint,
1979: MuleMessage message) throws MuleException {
1980: throw new UnsupportedOperationException(CoreMessages
1981: .streamingNotSupported(this .getProtocol()).toString());
1982: }
1983:
1984: public MuleContext getMuleContext() {
1985: return muleContext;
1986: }
1987:
1988: public void setMuleContext(MuleContext context) {
1989: this .muleContext = context;
1990: updateCachedNotificationHandler();
1991: }
1992:
1993: // @Override
1994: public String toString() {
1995: final StringBuffer sb = new StringBuffer(120);
1996: sb.append(ClassUtils.getSimpleName(this .getClass()));
1997: sb.append("{this=").append(
1998: Integer.toHexString(System.identityHashCode(this )));
1999: sb.append(", started=").append(started);
2000: sb.append(", initialised=").append(initialised);
2001: sb.append(", name='").append(name).append('\'');
2002: sb.append(", disposed=").append(disposed);
2003: sb.append(", numberOfConcurrentTransactedReceivers=").append(
2004: numberOfConcurrentTransactedReceivers);
2005: sb.append(", createMultipleTransactedReceivers=").append(
2006: createMultipleTransactedReceivers);
2007: sb.append(", connected=").append(connected);
2008: sb.append(", supportedProtocols=").append(supportedProtocols);
2009: sb.append(", serviceOverrides=").append(serviceOverrides);
2010: sb.append('}');
2011: return sb.toString();
2012: }
2013: }
|