Source Code Cross Referenced for AbstractConnector.java in  » ESB » mule » org » mule » transport » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » mule » org.mule.transport 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * $Id: AbstractConnector.java 11343 2008-03-13 10:58:26Z tcarlson $
0003:         * --------------------------------------------------------------------------------------
0004:         * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com
0005:         *
0006:         * The software in this package is published under the terms of the CPAL v1.0
0007:         * license, a copy of which has been included with this distribution in the
0008:         * LICENSE.txt file.
0009:         */
0010:
0011:        package org.mule.transport;
0012:
0013:        import org.mule.DefaultExceptionStrategy;
0014:        import org.mule.MuleSessionHandler;
0015:        import org.mule.RegistryContext;
0016:        import org.mule.api.MessagingException;
0017:        import org.mule.api.MuleContext;
0018:        import org.mule.api.MuleEvent;
0019:        import org.mule.api.MuleException;
0020:        import org.mule.api.MuleMessage;
0021:        import org.mule.api.MuleRuntimeException;
0022:        import org.mule.api.config.ThreadingProfile;
0023:        import org.mule.api.context.WorkManager;
0024:        import org.mule.api.context.notification.ServerNotification;
0025:        import org.mule.api.context.notification.ServerNotificationHandler;
0026:        import org.mule.api.endpoint.EndpointURI;
0027:        import org.mule.api.endpoint.InboundEndpoint;
0028:        import org.mule.api.endpoint.OutboundEndpoint;
0029:        import org.mule.api.lifecycle.DisposeException;
0030:        import org.mule.api.lifecycle.InitialisationException;
0031:        import org.mule.api.lifecycle.LifecycleTransitionResult;
0032:        import org.mule.api.registry.ServiceDescriptorFactory;
0033:        import org.mule.api.registry.ServiceException;
0034:        import org.mule.api.service.Service;
0035:        import org.mule.api.transport.Connectable;
0036:        import org.mule.api.transport.ConnectionStrategy;
0037:        import org.mule.api.transport.Connector;
0038:        import org.mule.api.transport.ConnectorException;
0039:        import org.mule.api.transport.DispatchException;
0040:        import org.mule.api.transport.MessageAdapter;
0041:        import org.mule.api.transport.MessageDispatcher;
0042:        import org.mule.api.transport.MessageDispatcherFactory;
0043:        import org.mule.api.transport.MessageReceiver;
0044:        import org.mule.api.transport.MessageRequester;
0045:        import org.mule.api.transport.MessageRequesterFactory;
0046:        import org.mule.api.transport.ReplyToHandler;
0047:        import org.mule.api.transport.SessionHandler;
0048:        import org.mule.config.i18n.CoreMessages;
0049:        import org.mule.context.notification.ConnectionNotification;
0050:        import org.mule.context.notification.MessageNotification;
0051:        import org.mule.context.notification.OptimisedNotificationHandler;
0052:        import org.mule.lifecycle.AlreadyInitialisedException;
0053:        import org.mule.model.streaming.DelegatingInputStream;
0054:        import org.mule.routing.filters.WildcardFilter;
0055:        import org.mule.transformer.TransformerUtils;
0056:        import org.mule.transport.service.TransportFactory;
0057:        import org.mule.transport.service.TransportServiceDescriptor;
0058:        import org.mule.transport.service.TransportServiceException;
0059:        import org.mule.util.BeanUtils;
0060:        import org.mule.util.ClassUtils;
0061:        import org.mule.util.CollectionUtils;
0062:        import org.mule.util.ObjectNameHelper;
0063:        import org.mule.util.ObjectUtils;
0064:        import org.mule.util.StringUtils;
0065:        import org.mule.util.concurrent.NamedThreadFactory;
0066:        import org.mule.util.concurrent.WaitableBoolean;
0067:
0068:        import java.beans.ExceptionListener;
0069:        import java.io.IOException;
0070:        import java.io.InputStream;
0071:        import java.io.OutputStream;
0072:        import java.util.ArrayList;
0073:        import java.util.Collections;
0074:        import java.util.Iterator;
0075:        import java.util.List;
0076:        import java.util.Map;
0077:        import java.util.Properties;
0078:
0079:        import javax.resource.spi.work.WorkEvent;
0080:        import javax.resource.spi.work.WorkListener;
0081:
0082:        import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
0083:        import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
0084:        import edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService;
0085:        import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
0086:        import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
0087:        import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
0088:        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
0089:        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
0090:
0091:        import org.apache.commons.logging.Log;
0092:        import org.apache.commons.logging.LogFactory;
0093:        import org.apache.commons.pool.KeyedPoolableObjectFactory;
0094:        import org.apache.commons.pool.impl.GenericKeyedObjectPool;
0095:
0096:        /**
0097:         * <code>AbstractConnector</code> provides base functionality for all connectors
0098:         * provided with Mule. Connectors are the mechanism used to connect to external
0099:         * systems and protocols in order to send and receive data. <p/> The
0100:         * <code>AbstractConnector</code> provides getter and setter methods for endpoint
0101:         * name, transport name and protocol. It also provides methods to stop and start
0102:         * connecotors and sets up a dispatcher threadpool which allows deriving connectors
0103:         * the possibility to dispatch work to separate threads. This functionality is
0104:         * controlled with the <i> doThreading</i> property on the threadingProfiles for
0105:         * dispachers and receivers. The lifecycle for a connector is -
0106:         * <ol>
0107:         * <li>Create
0108:         * <li>Initialise
0109:         * <li>Connect
0110:         * <li>Connect receivers
0111:         * <li>Start
0112:         * <li>Start Receivers
0113:         * <li>Stop
0114:         * <li>Stop Receivers
0115:         * <li>Disconnect
0116:         * <li>Disconnect Receivers
0117:         * <li>Dispose
0118:         * <li>Dispose Receivers
0119:         * </ol>
0120:         */
0121:        public abstract class AbstractConnector implements  Connector,
0122:                ExceptionListener, Connectable, WorkListener {
0123:            /**
0124:             * Default number of concurrent transactional receivers.
0125:             */
0126:            public static final int DEFAULT_NUM_CONCURRENT_TX_RECEIVERS = 4;
0127:
0128:            /**
0129:             * logger used by this class
0130:             */
0131:            protected final Log logger = LogFactory.getLog(getClass());
0132:
0133:            /**
0134:             * Specifies if the endpoint started
0135:             */
0136:            protected final AtomicBoolean started = new AtomicBoolean(false);
0137:
0138:            /**
0139:             * True once the endpoint has been initialsed
0140:             */
0141:            protected final AtomicBoolean initialised = new AtomicBoolean(false);
0142:
0143:            /**
0144:             * The name that identifies the endpoint
0145:             */
0146:            protected volatile String name;
0147:
0148:            /**
0149:             * The exception strategy used by this connector
0150:             */
0151:            protected volatile ExceptionListener exceptionListener;
0152:
0153:            /**
0154:             * Determines in the connector is alive and well
0155:             */
0156:            protected final AtomicBoolean disposed = new AtomicBoolean(false);
0157:
0158:            /**
0159:             * Determines in connector has been told to dispose
0160:             */
0161:            protected final AtomicBoolean disposing = new AtomicBoolean(false);
0162:
0163:            /**
0164:             * Factory used to create dispatchers for this connector
0165:             */
0166:            protected volatile MessageDispatcherFactory dispatcherFactory;
0167:
0168:            /**
0169:             * Factory used to create requesters for this connector
0170:             */
0171:            protected volatile MessageRequesterFactory requesterFactory;
0172:
0173:            /**
0174:             * A pool of dispatchers for this connector, keyed by endpoint
0175:             */
0176:            protected final GenericKeyedObjectPool dispatchers = new GenericKeyedObjectPool();
0177:
0178:            /**
0179:             * A pool of requesters for this connector, keyed by endpoint
0180:             */
0181:            protected final GenericKeyedObjectPool requesters = new GenericKeyedObjectPool();
0182:
0183:            /**
0184:             * The collection of listeners on this connector. Keyed by entrypoint
0185:             */
0186:            protected final ConcurrentMap receivers = new ConcurrentHashMap();
0187:
0188:            /**
0189:             * Defines the dispatcher threading profile
0190:             */
0191:            private volatile ThreadingProfile dispatcherThreadingProfile;
0192:
0193:            /**
0194:             * Defines the requester threading profile
0195:             */
0196:            private volatile ThreadingProfile requesterThreadingProfile;
0197:
0198:            /**
0199:             * Defines the receiver threading profile
0200:             */
0201:            private volatile ThreadingProfile receiverThreadingProfile;
0202:
0203:            /**
0204:             * @see {@link #isCreateMultipleTransactedReceivers()}
0205:             */
0206:            protected volatile boolean createMultipleTransactedReceivers = true;
0207:
0208:            /**
0209:             * @see {@link #getNumberOfConcurrentTransactedReceivers()}
0210:             */
0211:            protected volatile int numberOfConcurrentTransactedReceivers = DEFAULT_NUM_CONCURRENT_TX_RECEIVERS;
0212:
0213:            protected volatile ConnectionStrategy connectionStrategy;
0214:
0215:            protected final WaitableBoolean connected = new WaitableBoolean(
0216:                    false);
0217:
0218:            protected final WaitableBoolean connecting = new WaitableBoolean(
0219:                    false);
0220:
0221:            /**
0222:             * If the connect method was called via the start method, this will be set so
0223:             * that when the connector comes on line it will be started
0224:             */
0225:            protected final WaitableBoolean startOnConnect = new WaitableBoolean(
0226:                    false);
0227:
0228:            /**
0229:             * Optimise the handling of message notifications.  If dynamic is set to false then the
0230:             * cached notification handler implements a shortcut for message notifications.
0231:             */
0232:            private boolean dynamicNotification = false;
0233:            private ServerNotificationHandler cachedNotificationHandler;
0234:
0235:            private final List supportedProtocols;
0236:
0237:            /**
0238:             * A shared work manager for all receivers registered with this connector.
0239:             */
0240:            private final AtomicReference/*<WorkManager>*/receiverWorkManager = new AtomicReference();
0241:
0242:            /**
0243:             * A shared work manager for all requesters created for this connector.
0244:             */
0245:            private final AtomicReference/*<WorkManager>*/dispatcherWorkManager = new AtomicReference();
0246:
0247:            /**
0248:             * A shared work manager for all requesters created for this connector.
0249:             */
0250:            private final AtomicReference/*<WorkManager>*/requesterWorkManager = new AtomicReference();
0251:
0252:            /**
0253:             * A generic scheduling service for tasks that need to be performed periodically.
0254:             */
0255:            private final AtomicReference/*<ScheduledExecutorService>*/scheduler = new AtomicReference();
0256:
0257:            /**
0258:             * Holds the service configuration for this connector
0259:             */
0260:            protected volatile TransportServiceDescriptor serviceDescriptor;
0261:
0262:            /**
0263:             * The map of service overrides that can be used to extend the capabilities of the
0264:             * connector
0265:             */
0266:            protected volatile Properties serviceOverrides;
0267:
0268:            /**
0269:             * The strategy used for reading and writing session information to and fromt he
0270:             * transport
0271:             */
0272:            protected volatile SessionHandler sessionHandler = new MuleSessionHandler();
0273:
0274:            protected MuleContext muleContext;
0275:
0276:            public AbstractConnector() {
0277:                setDynamicNotification(false);
0278:
0279:                // always add at least the default protocol
0280:                supportedProtocols = new ArrayList();
0281:                supportedProtocols.add(getProtocol().toLowerCase());
0282:
0283:                connectionStrategy = new SingleAttemptConnectionStrategy();
0284:
0285:                // TODO dispatcher pool configuration should be extracted, maybe even
0286:                // moved into the factory?
0287:                // NOTE: testOnBorrow MUST be FALSE. this is a bit of a design bug in
0288:                // commons-pool since validate is used for both activation and passivation,
0289:                // but has no way of knowing which way it is going.
0290:                dispatchers.setTestOnBorrow(false);
0291:                dispatchers.setTestOnReturn(true);
0292:                requesters.setTestOnBorrow(false);
0293:                requesters.setTestOnReturn(true);
0294:            }
0295:
0296:            /*
0297:             * (non-Javadoc)
0298:             *
0299:             * @see org.mule.transport.UMOConnector#getName()
0300:             */
0301:            public String getName() {
0302:                return name;
0303:            }
0304:
0305:            /*
0306:             * (non-Javadoc)
0307:             *
0308:             * @see org.mule.transport.UMOConnector#setName(java.lang.String)
0309:             */
0310:            public void setName(String newName) {
0311:                if (newName == null) {
0312:                    throw new IllegalArgumentException(CoreMessages
0313:                            .objectIsNull("Connector name").toString());
0314:                }
0315:
0316:                if (logger.isDebugEnabled()) {
0317:                    logger.debug("Set Connector name to: " + newName);
0318:                }
0319:
0320:                name = newName;
0321:            }
0322:
0323:            /*
0324:             * (non-Javadoc)
0325:             *
0326:             * @see org.mule.transport.UMOConnector#create(java.util.HashMap)
0327:             */
0328:            public final synchronized LifecycleTransitionResult initialise()
0329:                    throws InitialisationException {
0330:                if (initialised.get()) {
0331:                    InitialisationException e = new AlreadyInitialisedException(
0332:                            "Connector '" + getProtocol() + "." + getName()
0333:                                    + "'", this );
0334:                    throw e;
0335:                    // Just log a warning since initializing twice is bad but might not be the end of the world.
0336:                    //logger.warn(e);
0337:                }
0338:
0339:                if (logger.isInfoEnabled()) {
0340:                    logger.info("Initialising: " + this );
0341:                }
0342:
0343:                // Use lazy-init (in get() methods) for this instead.
0344:                //dispatcherThreadingProfile = muleContext.getDefaultMessageDispatcherThreadingProfile();
0345:                //requesterThreadingProfile = muleContext.getDefaultMessageRequesterThreadingProfile();
0346:                //receiverThreadingProfile = muleContext.getDefaultMessageReceiverThreadingProfile();
0347:
0348:                // Initialise the structure of this connector
0349:                this .initFromServiceDescriptor();
0350:
0351:                this .doInitialise();
0352:
0353:                // We do the management context injection here just in case we're using a default ExceptionStrategy
0354:                //We always create a default just in case anything goes wrong before
0355:                if (exceptionListener == null) {
0356:                    exceptionListener = new DefaultExceptionStrategy();
0357:                    ((DefaultExceptionStrategy) exceptionListener)
0358:                            .setMuleContext(muleContext);
0359:                    ((DefaultExceptionStrategy) exceptionListener).initialise();
0360:                }
0361:
0362:                try {
0363:                    initWorkManagers();
0364:                } catch (MuleException e) {
0365:                    throw new InitialisationException(e, this );
0366:                }
0367:
0368:                initialised.set(true);
0369:
0370:                return LifecycleTransitionResult.OK;
0371:            }
0372:
0373:            /*
0374:             * (non-Javadoc)
0375:             *
0376:             * @see org.mule.api.transport.Connector#start()
0377:             */
0378:            public final synchronized LifecycleTransitionResult start()
0379:                    throws MuleException {
0380:                this .checkDisposed();
0381:
0382:                if (!this .isStarted()) {
0383:                    //TODO: Not sure about this.  Right now the connector will connect only once
0384:                    // there is an endpoint associated with it and that endpoint is connected.
0385:                    // Do we also need the option of connecting the connector without any endpoints?
0386:                    //            if (!this.isConnected())
0387:                    //            {
0388:                    //                startOnConnect.set(true);
0389:                    //                this.getConnectionStrategy().connect(this);
0390:                    //                // Only start once we are connected
0391:                    //                return;
0392:                    //            }
0393:                    if (!this .isConnected()) {
0394:                        startOnConnect.set(true);
0395:                        // Don't call getConnectionStrategy(), it clones the connection strategy.
0396:                        // Connectors should have a single reconnection thread, unlike per receiver/dispatcher
0397:                        connectionStrategy.connect(this );
0398:                        // Only start once we are connected
0399:                        return LifecycleTransitionResult.OK;
0400:                    }
0401:
0402:                    if (logger.isInfoEnabled()) {
0403:                        logger.info("Starting: " + this );
0404:                    }
0405:
0406:                    // the scheduler is recreated after stop()
0407:                    ScheduledExecutorService currentScheduler = (ScheduledExecutorService) scheduler
0408:                            .get();
0409:                    if (currentScheduler == null
0410:                            || currentScheduler.isShutdown()) {
0411:                        scheduler.set(this .getScheduler());
0412:                    }
0413:
0414:                    this .doStart();
0415:                    started.set(true);
0416:
0417:                    if (receivers != null) {
0418:                        for (Iterator iterator = receivers.values().iterator(); iterator
0419:                                .hasNext();) {
0420:                            MessageReceiver mr = (MessageReceiver) iterator
0421:                                    .next();
0422:                            if (logger.isDebugEnabled()) {
0423:                                logger.debug("Starting receiver on endpoint: "
0424:                                        + mr.getEndpoint().getEndpointURI());
0425:                            }
0426:                            mr.start();
0427:                        }
0428:                    }
0429:
0430:                    if (logger.isInfoEnabled()) {
0431:                        logger.info("Started: " + this );
0432:                    }
0433:                }
0434:                return LifecycleTransitionResult.OK;
0435:            }
0436:
0437:            /*
0438:             * (non-Javadoc)
0439:             *
0440:             * @see org.mule.api.transport.Connector#isStarted()
0441:             */
0442:            public boolean isStarted() {
0443:                return started.get();
0444:            }
0445:
0446:            /*
0447:             * (non-Javadoc)
0448:             *
0449:             * @see org.mule.api.transport.Connector#stop()
0450:             */
0451:            public final synchronized LifecycleTransitionResult stop()
0452:                    throws MuleException {
0453:                if (this .isDisposed()) {
0454:                    return LifecycleTransitionResult.OK;
0455:                }
0456:
0457:                if (this .isStarted()) {
0458:                    if (logger.isInfoEnabled()) {
0459:                        logger.info("Stopping: " + this );
0460:                    }
0461:
0462:                    // shutdown our scheduler service
0463:                    ((ScheduledExecutorService) scheduler.get()).shutdown();
0464:
0465:                    this .doStop();
0466:                    started.set(false);
0467:
0468:                    // Stop all the receivers on this connector (this will cause them to
0469:                    // disconnect too)
0470:                    if (receivers != null) {
0471:                        for (Iterator iterator = receivers.values().iterator(); iterator
0472:                                .hasNext();) {
0473:                            MessageReceiver mr = (MessageReceiver) iterator
0474:                                    .next();
0475:                            if (logger.isDebugEnabled()) {
0476:                                logger.debug("Stopping receiver on endpoint: "
0477:                                        + mr.getEndpoint().getEndpointURI());
0478:                            }
0479:                            mr.stop();
0480:                        }
0481:                    }
0482:                }
0483:
0484:                if (this .isConnected()) {
0485:                    try {
0486:                        this .disconnect();
0487:                    } catch (Exception e) {
0488:                        // TODO MULE-863: What should we really do?
0489:                        logger.error("Failed to disconnect: " + e.getMessage(),
0490:                                e);
0491:                    }
0492:                }
0493:
0494:                // make sure the scheduler is gone
0495:                scheduler.set(null);
0496:
0497:                // we do not need to stop the work managers because they do no harm (will just be idle)
0498:                // and will be reused on restart without problems.
0499:
0500:                //TODO RM* THis shouldn't be here this.initialised.set(false);
0501:                // started=false already issued above right after doStop()
0502:                if (logger.isInfoEnabled()) {
0503:                    logger.info("Stopped: " + this );
0504:                }
0505:                return LifecycleTransitionResult.OK;
0506:            }
0507:
0508:            /*
0509:             * (non-Javadoc)
0510:             *
0511:             * @see org.mule.api.transport.Connector#shutdown()
0512:             */
0513:            public final synchronized void dispose() {
0514:                disposing.set(true);
0515:
0516:                if (logger.isInfoEnabled()) {
0517:                    logger.info("Disposing: " + this );
0518:                }
0519:
0520:                try {
0521:                    this .stop();
0522:                } catch (MuleException e) {
0523:                    // TODO MULE-863: What should we really do?
0524:                    logger.warn("Failed to stop during shutdown: "
0525:                            + e.getMessage(), e);
0526:                }
0527:
0528:                this .disposeReceivers();
0529:                this .disposeDispatchers();
0530:                this .disposeRequesters();
0531:                this .disposeWorkManagers();
0532:
0533:                this .doDispose();
0534:                disposed.set(true);
0535:                initialised.set(false);
0536:
0537:                if (logger.isInfoEnabled()) {
0538:                    logger.info("Disposed: " + this );
0539:                }
0540:            }
0541:
0542:            protected void initWorkManagers() throws MuleException {
0543:                if (receiverWorkManager.get() == null) {
0544:                    WorkManager newWorkManager = this 
0545:                            .getReceiverThreadingProfile().createWorkManager(
0546:                                    getName() + ".receiver");
0547:
0548:                    if (receiverWorkManager.compareAndSet(null, newWorkManager)) {
0549:                        newWorkManager.start();
0550:                    }
0551:                }
0552:
0553:                if (dispatcherWorkManager.get() == null) {
0554:                    WorkManager newWorkManager = this 
0555:                            .getDispatcherThreadingProfile().createWorkManager(
0556:                                    getName() + ".dispatcher");
0557:
0558:                    if (dispatcherWorkManager.compareAndSet(null,
0559:                            newWorkManager)) {
0560:                        newWorkManager.start();
0561:                    }
0562:                }
0563:            }
0564:
0565:            protected void disposeWorkManagers() {
0566:                logger.debug("Disposing dispatcher work manager");
0567:                WorkManager workManager = (WorkManager) dispatcherWorkManager
0568:                        .get();
0569:                if (workManager != null) {
0570:                    workManager.dispose();
0571:                }
0572:                dispatcherWorkManager.set(null);
0573:
0574:                logger.debug("Disposing receiver work manager");
0575:                workManager = (WorkManager) receiverWorkManager.get();
0576:                if (workManager != null) {
0577:                    workManager.dispose();
0578:                }
0579:                receiverWorkManager.set(null);
0580:            }
0581:
0582:            protected void disposeReceivers() {
0583:                if (receivers != null) {
0584:                    logger.debug("Disposing Receivers");
0585:
0586:                    for (Iterator iterator = receivers.values().iterator(); iterator
0587:                            .hasNext();) {
0588:                        MessageReceiver receiver = (MessageReceiver) iterator
0589:                                .next();
0590:
0591:                        try {
0592:                            this .destroyReceiver(receiver, receiver
0593:                                    .getEndpoint());
0594:                        } catch (Throwable e) {
0595:                            // TODO MULE-863: What should we really do?
0596:                            logger.error("Failed to destroy receiver: "
0597:                                    + receiver, e);
0598:                        }
0599:                    }
0600:
0601:                    receivers.clear();
0602:                    logger.debug("Receivers Disposed");
0603:                }
0604:            }
0605:
0606:            protected void disposeDispatchers() {
0607:                if (dispatchers != null) {
0608:                    logger.debug("Disposing Dispatchers");
0609:                    dispatchers.clear();
0610:                    logger.debug("Dispatchers Disposed");
0611:                }
0612:            }
0613:
0614:            protected void disposeRequesters() {
0615:                if (requesters != null) {
0616:                    logger.debug("Disposing Requesters");
0617:                    requesters.clear();
0618:                    logger.debug("Requesters Disposed");
0619:                }
0620:            }
0621:
0622:            /*
0623:             * (non-Javadoc)
0624:             *
0625:             * @see org.mule.api.transport.Connector#isAlive()
0626:             */
0627:            public boolean isDisposed() {
0628:                return disposed.get();
0629:            }
0630:
0631:            /*
0632:             * (non-Javadoc)
0633:             *
0634:             * @see org.mule.api.transport.Connector#handleException(java.lang.Object,
0635:             *      java.lang.Throwable)
0636:             */
0637:            public void handleException(Exception exception) {
0638:                if (exceptionListener == null) {
0639:                    throw new MuleRuntimeException(CoreMessages
0640:                            .exceptionOnConnectorNotExceptionListener(this 
0641:                                    .getName()), exception);
0642:                } else {
0643:                    exceptionListener.exceptionThrown(exception);
0644:                }
0645:            }
0646:
0647:            /*
0648:             * (non-Javadoc)
0649:             *
0650:             * @see org.mule.util.ExceptionListener#onException(java.lang.Throwable)
0651:             */
0652:            public void exceptionThrown(Exception e) {
0653:                handleException(e);
0654:            }
0655:
0656:            /**
0657:             * @return the ExceptionStrategy for this endpoint
0658:             * @see ExceptionListener
0659:             */
0660:            public ExceptionListener getExceptionListener() {
0661:                return exceptionListener;
0662:            }
0663:
0664:            /**
0665:             * @param listener the ExceptionStrategy to use with this endpoint
0666:             * @see ExceptionListener
0667:             */
0668:            public void setExceptionListener(ExceptionListener listener) {
0669:                exceptionListener = listener;
0670:            }
0671:
0672:            /**
0673:             * @return Returns the dispatcherFactory.
0674:             */
0675:            public MessageDispatcherFactory getDispatcherFactory() {
0676:                return dispatcherFactory;
0677:            }
0678:
0679:            /**
0680:             * @param dispatcherFactory The dispatcherFactory to set.
0681:             */
0682:            public void setDispatcherFactory(
0683:                    MessageDispatcherFactory dispatcherFactory) {
0684:                KeyedPoolableObjectFactory poolFactory;
0685:
0686:                if (dispatcherFactory instanceof  KeyedPoolableObjectFactory) {
0687:                    poolFactory = (KeyedPoolableObjectFactory) dispatcherFactory;
0688:                } else {
0689:                    // need to adapt the factory for use by commons-pool
0690:                    poolFactory = new KeyedPoolMessageDispatcherFactoryAdapter(
0691:                            dispatcherFactory);
0692:                }
0693:
0694:                this .dispatchers.setFactory(poolFactory);
0695:
0696:                // we keep a reference to the unadapted factory, otherwise people might end
0697:                // up with ClassCastExceptions on downcast to their implementation (sigh)
0698:                this .dispatcherFactory = dispatcherFactory;
0699:            }
0700:
0701:            /**
0702:             * @return Returns the requesterFactory.
0703:             */
0704:            public MessageRequesterFactory getRequesterFactory() {
0705:                return requesterFactory;
0706:            }
0707:
0708:            /**
0709:             * @param requesterFactory The requesterFactory to set.
0710:             */
0711:            public void setRequesterFactory(
0712:                    MessageRequesterFactory requesterFactory) {
0713:                KeyedPoolableObjectFactory poolFactory;
0714:
0715:                if (requesterFactory instanceof  KeyedPoolableObjectFactory) {
0716:                    poolFactory = (KeyedPoolableObjectFactory) requesterFactory;
0717:                } else {
0718:                    // need to adapt the factory for use by commons-pool
0719:                    poolFactory = new KeyedPoolMessageRequesterFactoryAdapter(
0720:                            requesterFactory);
0721:                }
0722:
0723:                requesters.setFactory(poolFactory);
0724:
0725:                // we keep a reference to the unadapted factory, otherwise people might end
0726:                // up with ClassCastExceptions on downcast to their implementation (sigh)
0727:                this .requesterFactory = requesterFactory;
0728:            }
0729:
0730:            /**
0731:             * Returns the maximum number of dispatchers that can be concurrently active per
0732:             * endpoint.
0733:             *
0734:             * @return max. number of active dispatchers
0735:             */
0736:            public int getMaxDispatchersActive() {
0737:                return this .dispatchers.getMaxActive();
0738:            }
0739:
0740:            /**
0741:             * Configures the maximum number of dispatchers that can be concurrently active
0742:             * per endpoint
0743:             *
0744:             * @param maxActive max. number of active dispatchers
0745:             */
0746:            public void setMaxDispatchersActive(int maxActive) {
0747:                this .dispatchers.setMaxActive(maxActive);
0748:                // adjust maxIdle in tandem to avoid thrashing
0749:                this .dispatchers.setMaxIdle(maxActive);
0750:            }
0751:
0752:            private MessageDispatcher getDispatcher(OutboundEndpoint endpoint)
0753:                    throws MuleException {
0754:                this .checkDisposed();
0755:
0756:                if (endpoint == null) {
0757:                    throw new IllegalArgumentException(
0758:                            "Endpoint must not be null");
0759:                }
0760:
0761:                if (!supportsProtocol(endpoint.getConnector().getProtocol())) {
0762:                    throw new IllegalArgumentException(CoreMessages
0763:                            .connectorSchemeIncompatibleWithEndpointScheme(
0764:                                    this .getProtocol(),
0765:                                    endpoint.getEndpointURI().toString())
0766:                            .getMessage());
0767:                }
0768:
0769:                MessageDispatcher dispatcher = null;
0770:                try {
0771:                    if (logger.isDebugEnabled()) {
0772:                        logger.debug("Borrowing a dispatcher for endpoint: "
0773:                                + endpoint.getEndpointURI());
0774:                    }
0775:
0776:                    dispatcher = (MessageDispatcher) dispatchers
0777:                            .borrowObject(endpoint);
0778:
0779:                    if (logger.isDebugEnabled()) {
0780:                        logger.debug("Borrowed a dispatcher for endpoint: "
0781:                                + endpoint.getEndpointURI() + " = "
0782:                                + dispatcher.toString());
0783:                    }
0784:
0785:                    return dispatcher;
0786:                } catch (Exception ex) {
0787:                    throw new ConnectorException(CoreMessages
0788:                            .connectorCausedError(), this , ex);
0789:                } finally {
0790:                    try {
0791:                        if (logger.isDebugEnabled()) {
0792:                            logger.debug("Borrowed dispatcher: "
0793:                                    + ObjectUtils.toString(dispatcher, "null"));
0794:                        }
0795:                    } catch (Exception ex) {
0796:                        throw new ConnectorException(CoreMessages
0797:                                .connectorCausedError(), this , ex);
0798:                    }
0799:                }
0800:            }
0801:
0802:            private void returnDispatcher(OutboundEndpoint endpoint,
0803:                    MessageDispatcher dispatcher) {
0804:                if (endpoint != null && dispatcher != null) {
0805:                    try {
0806:                        if (logger.isDebugEnabled()) {
0807:                            logger.debug("Returning dispatcher for endpoint: "
0808:                                    + endpoint.getEndpointURI() + " = "
0809:                                    + dispatcher.toString());
0810:                        }
0811:
0812:                    } catch (Exception ex) {
0813:                        //Logging failed
0814:                    } finally {
0815:                        try {
0816:                            dispatchers.returnObject(endpoint, dispatcher);
0817:                        } catch (Exception e) {
0818:                            // TODO MULE-863: What should we really do?
0819:                            // ignore - if the dispatcher is broken, it will likely get cleaned
0820:                            // up by the factory
0821:                            //RM* I think we should at least log this error so give some indication of what is failing
0822:                            logger
0823:                                    .error(
0824:                                            "Failed to dispose dispatcher for endpoint: "
0825:                                                    + endpoint
0826:                                                    + ". This will cause a memory leak. Please report to",
0827:                                            e);
0828:                        }
0829:                    }
0830:                }
0831:            }
0832:
0833:            /**
0834:             * Returns the maximum number of requesters that can be concurrently active per
0835:             * endpoint.
0836:             *
0837:             * @return max. number of active requesters
0838:             */
0839:            public int getMaxRequestersActive() {
0840:                return this .requesters.getMaxActive();
0841:            }
0842:
0843:            /**
0844:             * Configures the maximum number of requesters that can be concurrently active
0845:             * per endpoint
0846:             *
0847:             * @param maxActive max. number of active requesters
0848:             */
0849:            public void setMaxRequestersActive(int maxActive) {
0850:                this .requesters.setMaxActive(maxActive);
0851:                // adjust maxIdle in tandem to avoid thrashing
0852:                this .requesters.setMaxIdle(maxActive);
0853:            }
0854:
0855:            private MessageRequester getRequester(InboundEndpoint endpoint)
0856:                    throws MuleException {
0857:                this .checkDisposed();
0858:
0859:                if (endpoint == null) {
0860:                    throw new IllegalArgumentException(
0861:                            "Endpoint must not be null");
0862:                }
0863:
0864:                if (!supportsProtocol(endpoint.getConnector().getProtocol())) {
0865:                    throw new IllegalArgumentException(CoreMessages
0866:                            .connectorSchemeIncompatibleWithEndpointScheme(
0867:                                    this .getProtocol(),
0868:                                    endpoint.getEndpointURI().toString())
0869:                            .getMessage());
0870:                }
0871:
0872:                MessageRequester requester = null;
0873:                try {
0874:                    if (logger.isDebugEnabled()) {
0875:                        logger.debug("Borrowing a requester for endpoint: "
0876:                                + endpoint.getEndpointURI());
0877:                    }
0878:
0879:                    requester = (MessageRequester) requesters
0880:                            .borrowObject(endpoint);
0881:
0882:                    if (logger.isDebugEnabled()) {
0883:                        logger.debug("Borrowed a requester for endpoint: "
0884:                                + endpoint.getEndpointURI() + " = "
0885:                                + requester.toString());
0886:                    }
0887:
0888:                    return requester;
0889:                } catch (Exception ex) {
0890:                    throw new ConnectorException(CoreMessages
0891:                            .connectorCausedError(), this , ex);
0892:                } finally {
0893:                    try {
0894:                        if (logger.isDebugEnabled()) {
0895:                            logger.debug("Borrowed requester: "
0896:                                    + ObjectUtils.toString(requester, "null"));
0897:                        }
0898:                    } catch (Exception ex) {
0899:                        throw new ConnectorException(CoreMessages
0900:                                .connectorCausedError(), this , ex);
0901:                    }
0902:                }
0903:            }
0904:
0905:            private void returnRequester(InboundEndpoint endpoint,
0906:                    MessageRequester requester) {
0907:                if (endpoint != null && requester != null) {
0908:                    try {
0909:                        if (logger.isDebugEnabled()) {
0910:                            logger.debug("Returning requester for endpoint: "
0911:                                    + endpoint.getEndpointURI() + " = "
0912:                                    + requester.toString());
0913:                        }
0914:
0915:                    } catch (Exception ex) {
0916:                        //Logging failed
0917:                    } finally {
0918:                        try {
0919:                            requesters.returnObject(endpoint, requester);
0920:                        } catch (Exception e) {
0921:                            // TODO MULE-863: What should we really do?
0922:                            // ignore - if the requester is broken, it will likely get cleaned
0923:                            // up by the factory
0924:                            //RM* I think we should at least log this error so give some indication of what is failing
0925:                            logger
0926:                                    .error(
0927:                                            "Failed to dispose requester for endpoint: "
0928:                                                    + endpoint
0929:                                                    + ". This will cause a memory leak. Please report to",
0930:                                            e);
0931:                        }
0932:                    }
0933:                }
0934:            }
0935:
0936:            protected void checkDisposed() throws DisposeException {
0937:                if (this .isDisposed()) {
0938:                    throw new DisposeException(CoreMessages
0939:                            .cannotUseDisposedConnector(), this );
0940:                }
0941:            }
0942:
0943:            public MessageReceiver registerListener(Service service,
0944:                    InboundEndpoint endpoint) throws Exception {
0945:                if (endpoint == null) {
0946:                    throw new IllegalArgumentException(
0947:                            "The endpoint cannot be null when registering a listener");
0948:                }
0949:
0950:                if (service == null) {
0951:                    throw new IllegalArgumentException(
0952:                            "The service cannot be null when registering a listener");
0953:                }
0954:
0955:                EndpointURI endpointUri = endpoint.getEndpointURI();
0956:                if (endpointUri == null) {
0957:                    throw new ConnectorException(CoreMessages
0958:                            .endpointIsNullForListener(), this );
0959:                }
0960:
0961:                logger.info("Registering listener: " + service.getName()
0962:                        + " on endpointUri: " + endpointUri.toString());
0963:
0964:                if (getReceiver(service, endpoint) != null) {
0965:                    throw new ConnectorException(CoreMessages
0966:                            .listenerAlreadyRegistered(endpointUri), this );
0967:                }
0968:
0969:                MessageReceiver receiver = createReceiver(service, endpoint);
0970:                Object receiverKey = getReceiverKey(service, endpoint);
0971:                receiver.setReceiverKey(receiverKey.toString());
0972:                // Since we're managing the creation we also need to initialise
0973:                receiver.initialise();
0974:                receivers.put(receiverKey, receiver);
0975:                // receivers.put(getReceiverKey(service, endpoint), receiver);
0976:
0977:                return receiver;
0978:            }
0979:
0980:            /**
0981:             * The method determines the key used to store the receiver against.
0982:             *
0983:             * @param service the service for which the endpoint is being registered
0984:             * @param endpoint the endpoint being registered for the service
0985:             * @return the key to store the newly created receiver against
0986:             */
0987:            protected Object getReceiverKey(Service service,
0988:                    InboundEndpoint endpoint) {
0989:                return StringUtils.defaultIfEmpty(endpoint.getEndpointURI()
0990:                        .getFilterAddress(), endpoint.getEndpointURI()
0991:                        .getAddress());
0992:            }
0993:
0994:            public final void unregisterListener(Service service,
0995:                    InboundEndpoint endpoint) throws Exception {
0996:                if (service == null) {
0997:                    throw new IllegalArgumentException(
0998:                            "The service must not be null when you unregister a listener");
0999:                }
1000:
1001:                if (endpoint == null) {
1002:                    throw new IllegalArgumentException(
1003:                            "The endpoint must not be null when you unregister a listener");
1004:                }
1005:
1006:                EndpointURI endpointUri = endpoint.getEndpointURI();
1007:                if (endpointUri == null) {
1008:                    throw new IllegalArgumentException(
1009:                            "The endpointUri must not be null when you unregister a listener");
1010:                }
1011:
1012:                if (logger.isInfoEnabled()) {
1013:                    logger.info("Removing listener on endpointUri: "
1014:                            + endpointUri);
1015:                }
1016:
1017:                if (receivers != null && !receivers.isEmpty()) {
1018:                    MessageReceiver receiver = (MessageReceiver) receivers
1019:                            .remove(getReceiverKey(service, endpoint));
1020:                    if (receiver != null) {
1021:                        destroyReceiver(receiver, endpoint);
1022:                        receiver.dispose();
1023:                    }
1024:                }
1025:            }
1026:
1027:            /**
1028:             * Getter for property 'dispatcherThreadingProfile'.
1029:             *
1030:             * @return Value for property 'dispatcherThreadingProfile'.
1031:             */
1032:            public ThreadingProfile getDispatcherThreadingProfile() {
1033:                if (dispatcherThreadingProfile == null && muleContext != null) {
1034:                    dispatcherThreadingProfile = muleContext
1035:                            .getDefaultMessageDispatcherThreadingProfile();
1036:                }
1037:                return dispatcherThreadingProfile;
1038:            }
1039:
1040:            /**
1041:             * Setter for property 'dispatcherThreadingProfile'.
1042:             *
1043:             * @param dispatcherThreadingProfile Value to set for property
1044:             *            'dispatcherThreadingProfile'.
1045:             */
1046:            public void setDispatcherThreadingProfile(
1047:                    ThreadingProfile dispatcherThreadingProfile) {
1048:                this .dispatcherThreadingProfile = dispatcherThreadingProfile;
1049:            }
1050:
1051:            /**
1052:             * Getter for property 'requesterThreadingProfile'.
1053:             *
1054:             * @return Value for property 'requesterThreadingProfile'.
1055:             */
1056:            public ThreadingProfile getRequesterThreadingProfile() {
1057:                if (requesterThreadingProfile == null && muleContext != null) {
1058:                    requesterThreadingProfile = muleContext
1059:                            .getDefaultMessageRequesterThreadingProfile();
1060:                }
1061:                return requesterThreadingProfile;
1062:            }
1063:
1064:            /**
1065:             * Setter for property 'requesterThreadingProfile'.
1066:             *
1067:             * @param requesterThreadingProfile Value to set for property
1068:             *            'requesterThreadingProfile'.
1069:             */
1070:            public void setRequesterThreadingProfile(
1071:                    ThreadingProfile requesterThreadingProfile) {
1072:                this .requesterThreadingProfile = requesterThreadingProfile;
1073:            }
1074:
1075:            /**
1076:             * Getter for property 'receiverThreadingProfile'.
1077:             *
1078:             * @return Value for property 'receiverThreadingProfile'.
1079:             */
1080:            public ThreadingProfile getReceiverThreadingProfile() {
1081:                if (receiverThreadingProfile == null && muleContext != null) {
1082:                    receiverThreadingProfile = muleContext
1083:                            .getDefaultMessageReceiverThreadingProfile();
1084:                }
1085:                return receiverThreadingProfile;
1086:            }
1087:
1088:            /**
1089:             * Setter for property 'receiverThreadingProfile'.
1090:             *
1091:             * @param receiverThreadingProfile Value to set for property
1092:             *            'receiverThreadingProfile'.
1093:             */
1094:            public void setReceiverThreadingProfile(
1095:                    ThreadingProfile receiverThreadingProfile) {
1096:                this .receiverThreadingProfile = receiverThreadingProfile;
1097:            }
1098:
1099:            public void destroyReceiver(MessageReceiver receiver,
1100:                    InboundEndpoint endpoint) throws Exception {
1101:                receiver.dispose();
1102:            }
1103:
1104:            protected abstract void doInitialise()
1105:                    throws InitialisationException;
1106:
1107:            /**
1108:             * Template method to perform any work when destroying the connectoe
1109:             */
1110:            protected abstract void doDispose();
1111:
1112:            /**
1113:             * Template method to perform any work when starting the connectoe
1114:             *
1115:             * @throws MuleException if the method fails
1116:             */
1117:            protected abstract void doStart() throws MuleException;
1118:
1119:            /**
1120:             * Template method to perform any work when stopping the connectoe
1121:             *
1122:             * @throws MuleException if the method fails
1123:             */
1124:            protected abstract void doStop() throws MuleException;
1125:
1126:            public List getDefaultInboundTransformers() {
1127:                if (serviceDescriptor == null) {
1128:                    throw new RuntimeException(
1129:                            "serviceDescriptor not initialized");
1130:                }
1131:                return TransformerUtils
1132:                        .getDefaultInboundTransformers(serviceDescriptor);
1133:            }
1134:
1135:            public List getDefaultResponseTransformers() {
1136:                if (serviceDescriptor == null) {
1137:                    throw new RuntimeException(
1138:                            "serviceDescriptor not initialized");
1139:                }
1140:                return TransformerUtils
1141:                        .getDefaultResponseTransformers(serviceDescriptor);
1142:            }
1143:
1144:            public List getDefaultOutboundTransformers() {
1145:                if (serviceDescriptor == null) {
1146:                    throw new RuntimeException(
1147:                            "serviceDescriptor not initialized");
1148:                }
1149:                return TransformerUtils
1150:                        .getDefaultOutboundTransformers(serviceDescriptor);
1151:            }
1152:
1153:            /**
1154:             * Getter for property 'replyToHandler'.
1155:             *
1156:             * @return Value for property 'replyToHandler'.
1157:             */
1158:            public ReplyToHandler getReplyToHandler() {
1159:                return new DefaultReplyToHandler(
1160:                        getDefaultResponseTransformers());
1161:            }
1162:
1163:            /**
1164:             * Fires a server notification to all registered listeners
1165:             *
1166:             * @param notification the notification to fire.
1167:             */
1168:            public void fireNotification(ServerNotification notification) {
1169:                cachedNotificationHandler.fireNotification(notification);
1170:            }
1171:
1172:            /**
1173:             * Getter for property 'connectionStrategy'.
1174:             *
1175:             * @return Value for property 'connectionStrategy'.
1176:             */
1177:            //TODO RM* REMOVE
1178:            public ConnectionStrategy getConnectionStrategy() {
1179:                // not happy with this but each receiver needs its own instance
1180:                // of the connection strategy and using a factory just introduces extra
1181:                // implementation
1182:                try {
1183:                    return (ConnectionStrategy) BeanUtils
1184:                            .cloneBean(connectionStrategy);
1185:                } catch (Exception e) {
1186:                    throw new MuleRuntimeException(CoreMessages
1187:                            .failedToClone("connectionStrategy"), e);
1188:                }
1189:            }
1190:
1191:            /**
1192:             * Setter for property 'connectionStrategy'.
1193:             *
1194:             * @param connectionStrategy Value to set for property 'connectionStrategy'.
1195:             */
1196:            public void setConnectionStrategy(
1197:                    ConnectionStrategy connectionStrategy) {
1198:                this .connectionStrategy = connectionStrategy;
1199:            }
1200:
1201:            /** {@inheritDoc} */
1202:            public boolean isDisposing() {
1203:                return disposing.get();
1204:            }
1205:
1206:            public boolean isRemoteSyncEnabled() {
1207:                return false;
1208:            }
1209:
1210:            public boolean isSyncEnabled(String protocol) {
1211:                return false;
1212:            }
1213:
1214:            public MessageReceiver getReceiver(Service service,
1215:                    InboundEndpoint endpoint) {
1216:                if (receivers != null) {
1217:                    Object key = getReceiverKey(service, endpoint);
1218:                    if (key != null) {
1219:                        return (MessageReceiver) receivers.get(key);
1220:                    } else {
1221:                        throw new RuntimeException(
1222:                                "getReceiverKey() returned a null key");
1223:                    }
1224:                } else {
1225:                    throw new RuntimeException(
1226:                            "Connector has not been initialized.");
1227:                }
1228:            }
1229:
1230:            /**
1231:             * Getter for property 'receivers'.
1232:             *
1233:             * @return Value for property 'receivers'.
1234:             */
1235:            public Map getReceivers() {
1236:                return Collections.unmodifiableMap(receivers);
1237:            }
1238:
1239:            public MessageReceiver lookupReceiver(String key) {
1240:                if (key != null) {
1241:                    return (MessageReceiver) receivers.get(key);
1242:                } else {
1243:                    throw new IllegalArgumentException(
1244:                            "Receiver key must not be null");
1245:                }
1246:            }
1247:
1248:            public MessageReceiver[] getReceivers(String wildcardExpression) {
1249:                WildcardFilter filter = new WildcardFilter(wildcardExpression);
1250:                filter.setCaseSensitive(false);
1251:
1252:                List found = new ArrayList();
1253:
1254:                for (Iterator iterator = receivers.entrySet().iterator(); iterator
1255:                        .hasNext();) {
1256:                    Map.Entry e = (Map.Entry) iterator.next();
1257:                    if (filter.accept(e.getKey())) {
1258:                        found.add(e.getValue());
1259:                    }
1260:                }
1261:
1262:                return (MessageReceiver[]) CollectionUtils
1263:                        .toArrayOfComponentType(found, MessageReceiver.class);
1264:            }
1265:
1266:            public void connect() throws Exception {
1267:                this .checkDisposed();
1268:
1269:                if (connected.get()) {
1270:                    return;
1271:                }
1272:
1273:                /*
1274:                    Until the recursive startConnector() -> connect() -> doConnect() -> connect()
1275:                    calls are unwound between a connector and connection strategy, this call has
1276:                    to be here, and not below (commented out currently). Otherwise, e.g. WebspherMQ
1277:                    goes into an endless reconnect thrashing loop, see MULE-1150 for more details.
1278:                 */
1279:                try {
1280:                    if (connecting.get()) {
1281:                        this .doConnect();
1282:                    }
1283:                    if (connecting.compareAndSet(false, true)) {
1284:                        if (logger.isDebugEnabled()) {
1285:                            logger.debug("Connecting: " + this );
1286:                        }
1287:
1288:                        connectionStrategy.connect(this );
1289:
1290:                        logger.info("Connected: " + getConnectionDescription());
1291:                        // This method calls itself so the connecting flag is set first, then
1292:                        // the connection is made on the second call
1293:                        return;
1294:                    }
1295:
1296:                    // see the explanation above
1297:                    //this.doConnect();
1298:                    connected.set(true);
1299:                    connecting.set(false);
1300:
1301:                    this .fireNotification(new ConnectionNotification(this ,
1302:                            getConnectEventId(),
1303:                            ConnectionNotification.CONNECTION_CONNECTED));
1304:                } catch (Exception e) {
1305:                    connected.set(false);
1306:                    connecting.set(false);
1307:
1308:                    this .fireNotification(new ConnectionNotification(this ,
1309:                            getConnectEventId(),
1310:                            ConnectionNotification.CONNECTION_FAILED));
1311:
1312:                    if (e instanceof  ConnectException
1313:                            || e instanceof  FatalConnectException) {
1314:                        // rethrow
1315:                        throw e;
1316:                    } else {
1317:                        throw new ConnectException(e, this );
1318:                    }
1319:                }
1320:
1321:                if (startOnConnect.get()) {
1322:                    this .start();
1323:                }
1324:                //TODO RM*. If the connection strategy is called on the receivers, the connector strategy gets called too,
1325:                //to ensure its connected. Therefore the connect method on the connector needs to be idempotent and not try
1326:                //and connect dispatchers or receivers
1327:
1328:                //        else
1329:                //        {
1330:                //            for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)
1331:                //            {
1332:                //                MessageReceiver receiver = (MessageReceiver)iterator.next();
1333:                //                if (logger.isDebugEnabled())
1334:                //                {
1335:                //                    logger.debug("Connecting receiver on endpoint: "
1336:                //                                    + receiver.getEndpoint().getEndpointURI());
1337:                //                }
1338:                //                receiver.connect();
1339:                //            }
1340:                //        }
1341:            }
1342:
1343:            public void disconnect() throws Exception {
1344:                startOnConnect.set(this .isStarted());
1345:
1346:                this .fireNotification(new ConnectionNotification(this ,
1347:                        getConnectEventId(),
1348:                        ConnectionNotification.CONNECTION_DISCONNECTED));
1349:
1350:                connected.set(false);
1351:
1352:                try {
1353:                    this .doDisconnect();
1354:                } finally {
1355:                    this .stop();
1356:                }
1357:
1358:                logger.info("Disconnected: " + this .getConnectionDescription());
1359:            }
1360:
1361:            public String getConnectionDescription() {
1362:                return this .toString();
1363:            }
1364:
1365:            public final boolean isConnected() {
1366:                return connected.get();
1367:            }
1368:
1369:            /**
1370:             * Template method where any connections should be made for the connector
1371:             *
1372:             * @throws Exception
1373:             */
1374:            protected abstract void doConnect() throws Exception;
1375:
1376:            /**
1377:             * Template method where any connected resources used by the connector should be
1378:             * disconnected
1379:             *
1380:             * @throws Exception
1381:             */
1382:            protected abstract void doDisconnect() throws Exception;
1383:
1384:            /**
1385:             * The resource id used when firing ConnectEvents from this connector
1386:             *
1387:             * @return the resource id used when firing ConnectEvents from this connector
1388:             */
1389:            protected String getConnectEventId() {
1390:                return getName();
1391:            }
1392:
1393:            /**
1394:             * For better throughput when using TransactedMessageReceivers this will enable a
1395:             * number of concurrent receivers, based on the value returned by
1396:             * {@link #getNumberOfConcurrentTransactedReceivers()}. This property is used by
1397:             * transports that support transactions, specifically receivers that extend the
1398:             * TransactedPollingMessageReceiver.
1399:             *
1400:             * @return true if multiple receivers will be enabled for this connection
1401:             */
1402:            public boolean isCreateMultipleTransactedReceivers() {
1403:                return createMultipleTransactedReceivers;
1404:            }
1405:
1406:            /**
1407:             * @see {@link #isCreateMultipleTransactedReceivers()}
1408:             * @param createMultipleTransactedReceivers if true, multiple receivers will be
1409:             *            created for this connection
1410:             */
1411:            public void setCreateMultipleTransactedReceivers(
1412:                    boolean createMultipleTransactedReceivers) {
1413:                this .createMultipleTransactedReceivers = createMultipleTransactedReceivers;
1414:            }
1415:
1416:            /**
1417:             * Returns the number of concurrent receivers that will be launched when
1418:             * {@link #isCreateMultipleTransactedReceivers()} returns <code>true</code>.
1419:             *
1420:             * @see #DEFAULT_NUM_CONCURRENT_TX_RECEIVERS
1421:             */
1422:            public int getNumberOfConcurrentTransactedReceivers() {
1423:                return numberOfConcurrentTransactedReceivers;
1424:            }
1425:
1426:            /**
1427:             * @see {@link #getNumberOfConcurrentTransactedReceivers()}
1428:             * @param count the number of concurrent transacted receivers to start
1429:             */
1430:            public void setNumberOfConcurrentTransactedReceivers(int count) {
1431:                numberOfConcurrentTransactedReceivers = count;
1432:            }
1433:
1434:            public void setDynamicNotification(boolean dynamic) {
1435:                dynamicNotification = dynamic;
1436:            }
1437:
1438:            protected void updateCachedNotificationHandler() {
1439:                if (null != muleContext) {
1440:                    if (dynamicNotification) {
1441:                        cachedNotificationHandler = muleContext
1442:                                .getNotificationManager();
1443:                    } else {
1444:                        cachedNotificationHandler = new OptimisedNotificationHandler(
1445:                                muleContext.getNotificationManager(),
1446:                                MessageNotification.class);
1447:                    }
1448:                }
1449:            }
1450:
1451:            public boolean isEnableMessageEvents() {
1452:                return cachedNotificationHandler
1453:                        .isNotificationEnabled(MessageNotification.class);
1454:            }
1455:
1456:            /**
1457:             * Registers other protocols 'understood' by this connector. These must contain
1458:             * scheme meta info. Any protocol registered must begin with the protocol of this
1459:             * connector, i.e. If the connector is axis the protocol for jms over axis will
1460:             * be axis:jms. Here, 'axis' is the scheme meta info and 'jms' is the protocol.
1461:             * If the protocol argument does not start with the connector's protocol, it will
1462:             * be appended.
1463:             *
1464:             * @param protocol the supported protocol to register
1465:             */
1466:            public void registerSupportedProtocol(String protocol) {
1467:                protocol = protocol.toLowerCase();
1468:                if (protocol.startsWith(getProtocol().toLowerCase())) {
1469:                    registerSupportedProtocolWithoutPrefix(protocol);
1470:                } else {
1471:                    supportedProtocols.add(getProtocol().toLowerCase() + ":"
1472:                            + protocol);
1473:                }
1474:            }
1475:
1476:            /**
1477:             * Registers other protocols 'understood' by this connector. These must contain
1478:             * scheme meta info. Unlike the {@link #registerSupportedProtocol(String)} method,
1479:             * this allows you to register protocols that are not prefixed with the connector
1480:             * protocol. This is useful where you use a Service Finder to discover which
1481:             * Transport implementation to use. For example the 'wsdl' transport is a generic
1482:             * 'finder' transport that will use Axis, Xfire or Glue to create the WSDL
1483:             * client. These transport protocols would be wsdl-axis, wsdl-xfire and
1484:             * wsdl-glue, but they can all support 'wsdl' protocol too.
1485:             *
1486:             * @param protocol the supported protocol to register
1487:             */
1488:            protected void registerSupportedProtocolWithoutPrefix(
1489:                    String protocol) {
1490:                supportedProtocols.add(protocol.toLowerCase());
1491:            }
1492:
1493:            public void unregisterSupportedProtocol(String protocol) {
1494:                protocol = protocol.toLowerCase();
1495:                if (protocol.startsWith(getProtocol().toLowerCase())) {
1496:                    supportedProtocols.remove(protocol);
1497:                } else {
1498:                    supportedProtocols.remove(getProtocol().toLowerCase() + ":"
1499:                            + protocol);
1500:                }
1501:            }
1502:
1503:            /**
1504:             * @return true if the protocol is supported by this connector.
1505:             */
1506:            public boolean supportsProtocol(String protocol) {
1507:                return supportedProtocols.contains(protocol.toLowerCase());
1508:            }
1509:
1510:            /**
1511:             * Returns an unmodifiable list of the protocols supported by this connector
1512:             *
1513:             * @return an unmodifiable list of the protocols supported by this connector
1514:             */
1515:            public List getSupportedProtocols() {
1516:                return Collections.unmodifiableList(supportedProtocols);
1517:            }
1518:
1519:            /**
1520:             * Sets A list of protocols that the connector can accept
1521:             *
1522:             * @param supportedProtocols
1523:             */
1524:            public void setSupportedProtocols(List supportedProtocols) {
1525:                for (Iterator iterator = supportedProtocols.iterator(); iterator
1526:                        .hasNext();) {
1527:                    String s = (String) iterator.next();
1528:                    registerSupportedProtocol(s);
1529:                }
1530:            }
1531:
1532:            /**
1533:             * Returns a work manager for message receivers.
1534:             */
1535:            protected WorkManager getReceiverWorkManager(String receiverName)
1536:                    throws MuleException {
1537:                return (WorkManager) receiverWorkManager.get();
1538:            }
1539:
1540:            /**
1541:             * Returns a work manager for message dispatchers.
1542:             *
1543:             * @throws MuleException in case of error
1544:             */
1545:            protected WorkManager getDispatcherWorkManager()
1546:                    throws MuleException {
1547:                return (WorkManager) dispatcherWorkManager.get();
1548:            }
1549:
1550:            /**
1551:             * Returns a work manager for message requesters.
1552:             *
1553:             * @throws MuleException in case of error
1554:             */
1555:            protected WorkManager getRequesterWorkManager()
1556:                    throws MuleException {
1557:                return (WorkManager) requesterWorkManager.get();
1558:            }
1559:
1560:            /**
1561:             * Returns a Scheduler service for periodic tasks, currently limited to internal
1562:             * use. Note: getScheduler() currently conflicts with the same method in the
1563:             * Quartz transport
1564:             */
1565:            public ScheduledExecutorService getScheduler() {
1566:                if (scheduler.get() == null) {
1567:                    ThreadFactory threadFactory = new NamedThreadFactory(this 
1568:                            .getName()
1569:                            + ".scheduler");
1570:                    ScheduledThreadPoolExecutor newExecutor = new ScheduledThreadPoolExecutor(
1571:                            4, threadFactory);
1572:                    newExecutor
1573:                            .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
1574:                    newExecutor.setKeepAliveTime(this 
1575:                            .getReceiverThreadingProfile().getThreadTTL(),
1576:                            TimeUnit.MILLISECONDS);
1577:                    newExecutor.allowCoreThreadTimeOut(true);
1578:
1579:                    if (!scheduler.compareAndSet(null, newExecutor)) {
1580:                        // someone else was faster, ditch our copy.
1581:                        newExecutor.shutdown();
1582:                    }
1583:                }
1584:
1585:                return (ScheduledExecutorService) scheduler.get();
1586:            }
1587:
1588:            /**
1589:             * Getter for property 'sessionHandler'.
1590:             *
1591:             * @return Value for property 'sessionHandler'.
1592:             */
1593:            public SessionHandler getSessionHandler() {
1594:                return sessionHandler;
1595:            }
1596:
1597:            /**
1598:             * Setter for property 'sessionHandler'.
1599:             *
1600:             * @param sessionHandler Value to set for property 'sessionHandler'.
1601:             */
1602:            public void setSessionHandler(SessionHandler sessionHandler) {
1603:                this .sessionHandler = sessionHandler;
1604:            }
1605:
1606:            public void workAccepted(WorkEvent event) {
1607:                this .handleWorkException(event, "workAccepted");
1608:            }
1609:
1610:            public void workRejected(WorkEvent event) {
1611:                this .handleWorkException(event, "workRejected");
1612:            }
1613:
1614:            public void workStarted(WorkEvent event) {
1615:                this .handleWorkException(event, "workStarted");
1616:            }
1617:
1618:            public void workCompleted(WorkEvent event) {
1619:                this .handleWorkException(event, "workCompleted");
1620:            }
1621:
1622:            protected void handleWorkException(WorkEvent event, String type) {
1623:                if (event == null) {
1624:                    return;
1625:                }
1626:
1627:                Throwable e = event.getException();
1628:
1629:                if (e == null) {
1630:                    return;
1631:                }
1632:
1633:                if (e.getCause() != null) {
1634:                    e = e.getCause();
1635:                }
1636:
1637:                logger.error("Work caused exception on '" + type
1638:                        + "'. Work being executed was: "
1639:                        + event.getWork().toString());
1640:
1641:                if (e instanceof  Exception) {
1642:                    this .handleException((Exception) e);
1643:                } else {
1644:                    throw new MuleRuntimeException(CoreMessages
1645:                            .connectorCausedError(this .getName()), e);
1646:                }
1647:            }
1648:
1649:            // TODO the following methods should probably be lifecycle-enabled;
1650:            // for now they are only stubs to get the refactoring going.
1651:
1652:            public void dispatch(OutboundEndpoint endpoint, MuleEvent event)
1653:                    throws DispatchException {
1654:                MessageDispatcher dispatcher = null;
1655:
1656:                try {
1657:                    dispatcher = this .getDispatcher(endpoint);
1658:                    dispatcher.dispatch(event);
1659:                } catch (DispatchException dex) {
1660:                    throw dex;
1661:                } catch (MuleException ex) {
1662:                    throw new DispatchException(event.getMessage(), endpoint,
1663:                            ex);
1664:                } finally {
1665:                    this .returnDispatcher(endpoint, dispatcher);
1666:                }
1667:            }
1668:
1669:            /**
1670:             * This method will return the dispatcher to the pool or, if the payload is an inputstream,
1671:             * replace the payload with a new DelegatingInputStream which returns the dispatcher to
1672:             * the pool when the stream is closed.
1673:             *
1674:             * @param endpoint
1675:             * @param dispatcher
1676:             * @param result
1677:             */
1678:            protected void setupDispatchReturn(final OutboundEndpoint endpoint,
1679:                    final MessageDispatcher dispatcher, MuleMessage result) {
1680:                if (result != null
1681:                        && result.getPayload() instanceof  InputStream) {
1682:                    DelegatingInputStream is = new DelegatingInputStream(
1683:                            (InputStream) result.getPayload()) {
1684:                        public void close() throws IOException {
1685:                            try {
1686:                                super .close();
1687:                            } finally {
1688:                                returnDispatcher(endpoint, dispatcher);
1689:                            }
1690:                        }
1691:                    };
1692:                    result.setPayload(is);
1693:                } else {
1694:
1695:                    this .returnDispatcher(endpoint, dispatcher);
1696:                }
1697:            }
1698:
1699:            public MuleMessage request(String uri, long timeout)
1700:                    throws Exception {
1701:                return request(getMuleContext().getRegistry()
1702:                        .lookupEndpointFactory().getInboundEndpoint(uri),
1703:                        timeout);
1704:            }
1705:
1706:            public MuleMessage request(InboundEndpoint endpoint, long timeout)
1707:                    throws Exception {
1708:                MessageRequester requester = null;
1709:                MuleMessage result = null;
1710:                try {
1711:                    requester = this .getRequester(endpoint);
1712:                    result = requester.request(timeout);
1713:                    return result;
1714:                } finally {
1715:                    setupRequestReturn(endpoint, requester, result);
1716:                }
1717:            }
1718:
1719:            /**
1720:             * This method will return the requester to the pool or, if the payload is an inputstream,
1721:             * replace the payload with a new DelegatingInputStream which returns the requester to
1722:             * the pool when the stream is closed.
1723:             *
1724:             * @param endpoint
1725:             * @param requester
1726:             * @param result
1727:             */
1728:            protected void setupRequestReturn(final InboundEndpoint endpoint,
1729:                    final MessageRequester requester, MuleMessage result) {
1730:                if (result != null
1731:                        && result.getPayload() instanceof  InputStream) {
1732:                    DelegatingInputStream is = new DelegatingInputStream(
1733:                            (InputStream) result.getPayload()) {
1734:                        public void close() throws IOException {
1735:                            try {
1736:                                super .close();
1737:                            } finally {
1738:                                returnRequester(endpoint, requester);
1739:                            }
1740:                        }
1741:                    };
1742:                    result.setPayload(is);
1743:                } else {
1744:
1745:                    this .returnRequester(endpoint, requester);
1746:                }
1747:            }
1748:
1749:            public MuleMessage send(OutboundEndpoint endpoint, MuleEvent event)
1750:                    throws DispatchException {
1751:                MessageDispatcher dispatcher = null;
1752:
1753:                try {
1754:                    dispatcher = this .getDispatcher(endpoint);
1755:                    return dispatcher.send(event);
1756:                } catch (DispatchException dex) {
1757:                    throw dex;
1758:                } catch (MuleException ex) {
1759:                    throw new DispatchException(event.getMessage(), endpoint,
1760:                            ex);
1761:                } finally {
1762:                    this .returnDispatcher(endpoint, dispatcher);
1763:                }
1764:            }
1765:
1766:            // -------- Methods from the removed AbstractServiceEnabled Connector
1767:
1768:            /**
1769:             * When this connector is created via the
1770:             * {@link org.mule.transport.service.TransportFactory} the endpoint used to
1771:             * determine the connector type is passed to this method so that any properties
1772:             * set on the endpoint that can be used to initialise the connector are made
1773:             * available.
1774:             *
1775:             * @param endpointUri the {@link EndpointURI} use to create this connector
1776:             * @throws InitialisationException If there are any problems with the
1777:             *             configuration set on the Endpoint or if another exception is
1778:             *             thrown it is wrapped in an InitialisationException.
1779:             */
1780:            public void initialiseFromUrl(EndpointURI endpointUri)
1781:                    throws InitialisationException {
1782:                if (!supportsProtocol(endpointUri.getFullScheme())) {
1783:                    throw new InitialisationException(CoreMessages
1784:                            .schemeNotCompatibleWithConnector(endpointUri
1785:                                    .getFullScheme(), this .getClass()), this );
1786:                }
1787:                Properties props = new Properties();
1788:                props.putAll(endpointUri.getParams());
1789:                // auto set username and password
1790:                if (endpointUri.getUserInfo() != null) {
1791:                    props.setProperty("username", endpointUri.getUser());
1792:                    String passwd = endpointUri.getPassword();
1793:                    if (passwd != null) {
1794:                        props.setProperty("password", passwd);
1795:                    }
1796:                }
1797:                String host = endpointUri.getHost();
1798:                if (host != null) {
1799:                    props.setProperty("hostname", host);
1800:                    props.setProperty("host", host);
1801:                }
1802:                if (endpointUri.getPort() > -1) {
1803:                    props.setProperty("port", String.valueOf(endpointUri
1804:                            .getPort()));
1805:                }
1806:
1807:                org.mule.util.BeanUtils.populateWithoutFail(this , props, true);
1808:
1809:                setName(ObjectNameHelper.getConnectorName(this ));
1810:                //initialise();
1811:            }
1812:
1813:            /**
1814:             * Initialises this connector from its {@link TransportServiceDescriptor} This
1815:             * will be called before the {@link #doInitialise()} method is called.
1816:             *
1817:             * @throws InitialisationException InitialisationException If there are any
1818:             *             problems with the configuration or if another exception is thrown
1819:             *             it is wrapped in an InitialisationException.
1820:             */
1821:            protected synchronized void initFromServiceDescriptor()
1822:                    throws InitialisationException {
1823:                try {
1824:                    serviceDescriptor = (TransportServiceDescriptor) RegistryContext
1825:                            .getRegistry()
1826:                            .lookupServiceDescriptor(
1827:                                    ServiceDescriptorFactory.PROVIDER_SERVICE_TYPE,
1828:                                    getProtocol().toLowerCase(),
1829:                                    serviceOverrides);
1830:                    if (serviceDescriptor == null) {
1831:                        throw new ServiceException(CoreMessages
1832:                                .noServiceTransportDescriptor(getProtocol()));
1833:                    }
1834:
1835:                    if (logger.isDebugEnabled()) {
1836:                        logger
1837:                                .debug("Loading DispatcherFactory for connector: "
1838:                                        + getName()
1839:                                        + " ("
1840:                                        + getClass().getName() + ")");
1841:                    }
1842:
1843:                    MessageDispatcherFactory df = serviceDescriptor
1844:                            .createDispatcherFactory();
1845:                    if (df != null) {
1846:                        this .setDispatcherFactory(df);
1847:                    } else if (logger.isDebugEnabled()) {
1848:                        logger.debug("Transport '" + getProtocol()
1849:                                + "' will not support outbound endpoints: ");
1850:                    }
1851:
1852:                    if (logger.isDebugEnabled()) {
1853:                        logger
1854:                                .debug("Loading RequesterFactory for connector: "
1855:                                        + getName()
1856:                                        + " ("
1857:                                        + getClass().getName() + ")");
1858:                    }
1859:
1860:                    MessageRequesterFactory rf = serviceDescriptor
1861:                            .createRequesterFactory();
1862:                    if (rf != null) {
1863:                        this .setRequesterFactory(rf);
1864:                    } else if (logger.isDebugEnabled()) {
1865:                        logger.debug("Transport '" + getProtocol()
1866:                                + "' will not support requests: ");
1867:                    }
1868:
1869:                    sessionHandler = serviceDescriptor.createSessionHandler();
1870:
1871:                    // TODO Do we still need to support this for 2.x?
1872:                    // Set any manager default properties for the connector. These are set on
1873:                    // the Manager with a protocol e.g. jms.specification=1.1
1874:                    // This provides a really convenient way to set properties on an object
1875:                    // from unit tests
1876:                    //            Map props = new HashMap();
1877:                    //            PropertiesUtils.getPropertiesWithPrefix(muleContext.getRegistry().lookupProperties(), getProtocol()
1878:                    //                .toLowerCase(), props);
1879:                    //            if (props.size() > 0)
1880:                    //            {
1881:                    //                props = PropertiesUtils.removeNamespaces(props);
1882:                    //                org.mule.util.BeanUtils.populateWithoutFail(this, props, true);
1883:                    //            }
1884:                } catch (Exception e) {
1885:                    throw new InitialisationException(e, this );
1886:                }
1887:            }
1888:
1889:            /**
1890:             * Get the {@link TransportServiceDescriptor} for this connector. This will be
1891:             * null if the connector was created by the developer. To create a connector the
1892:             * proper way the developer should use the {@link TransportFactory} and pass in
1893:             * an endpoint.
1894:             *
1895:             * @return the {@link TransportServiceDescriptor} for this connector
1896:             */
1897:            protected TransportServiceDescriptor getServiceDescriptor() {
1898:                if (serviceDescriptor == null) {
1899:                    throw new IllegalStateException(
1900:                            "This connector has not yet been initialised: "
1901:                                    + name);
1902:                }
1903:                return serviceDescriptor;
1904:            }
1905:
1906:            /**
1907:             * Create a Message receiver for this connector
1908:             *
1909:             * @param service the service that will receive events from this receiver,
1910:             *            the listener
1911:             * @param endpoint the endpoint that defies this inbound communication
1912:             * @return an instance of the message receiver defined in this connectors'
1913:             *         {@link org.mule.transport.service.TransportServiceDescriptor}
1914:             *         initialised using the service and endpoint.
1915:             * @throws Exception if there is a problem creating the receiver. This exception
1916:             *             really depends on the underlying transport, thus any exception
1917:             *             could be thrown
1918:             */
1919:            protected MessageReceiver createReceiver(Service service,
1920:                    InboundEndpoint endpoint) throws Exception {
1921:                return getServiceDescriptor().createMessageReceiver(this ,
1922:                        service, endpoint);
1923:            }
1924:
1925:            /**
1926:             * Gets a <code>MessageAdapter</code> for the endpoint for the given message
1927:             * (data)
1928:             *
1929:             * @param message the data with which to initialise the
1930:             *            <code>MessageAdapter</code>
1931:             * @return the <code>MessageAdapter</code> for the endpoint
1932:             * @throws org.mule.api.MessagingException if the message parameter is not
1933:             *             supported
1934:             * @see org.mule.api.transport.MessageAdapter
1935:             */
1936:            public MessageAdapter getMessageAdapter(Object message)
1937:                    throws MessagingException {
1938:                try {
1939:                    return serviceDescriptor.createMessageAdapter(message);
1940:                } catch (TransportServiceException e) {
1941:                    throw new MessagingException(CoreMessages
1942:                            .failedToCreate("Message Adapter"), message, e);
1943:                }
1944:            }
1945:
1946:            /**
1947:             * A map of fully qualified class names that should override those in the
1948:             * connectors' service descriptor This map will be null if there are no overrides
1949:             *
1950:             * @return a map of override values or null
1951:             */
1952:            public Map getServiceOverrides() {
1953:                return serviceOverrides;
1954:            }
1955:
1956:            /**
1957:             * Set the Service overrides on this connector.
1958:             *
1959:             * @param serviceOverrides the override values to use
1960:             */
1961:            public void setServiceOverrides(Map serviceOverrides) {
1962:                this .serviceOverrides = new Properties();
1963:                this .serviceOverrides.putAll(serviceOverrides);
1964:            }
1965:
1966:            /**
1967:             * Will get the output stream for this type of transport. Typically this
1968:             * will be called only when Streaming is being used on an outbound endpoint.
1969:             * If Streaming is not supported by this transport an {@link UnsupportedOperationException}
1970:             * is thrown.   Note that the stream MUST release resources on close.  For help doing so, see
1971:             * {@link org.mule.model.streaming.CallbackOutputStream}.
1972:             *
1973:             * @param endpoint the endpoint that releates to this Dispatcher
1974:             * @param message the current message being processed
1975:             * @return the output stream to use for this request
1976:             * @throws MuleException in case of any error
1977:             */
1978:            public OutputStream getOutputStream(OutboundEndpoint endpoint,
1979:                    MuleMessage message) throws MuleException {
1980:                throw new UnsupportedOperationException(CoreMessages
1981:                        .streamingNotSupported(this .getProtocol()).toString());
1982:            }
1983:
1984:            public MuleContext getMuleContext() {
1985:                return muleContext;
1986:            }
1987:
1988:            public void setMuleContext(MuleContext context) {
1989:                this .muleContext = context;
1990:                updateCachedNotificationHandler();
1991:            }
1992:
1993:            // @Override
1994:            public String toString() {
1995:                final StringBuffer sb = new StringBuffer(120);
1996:                sb.append(ClassUtils.getSimpleName(this .getClass()));
1997:                sb.append("{this=").append(
1998:                        Integer.toHexString(System.identityHashCode(this )));
1999:                sb.append(", started=").append(started);
2000:                sb.append(", initialised=").append(initialised);
2001:                sb.append(", name='").append(name).append('\'');
2002:                sb.append(", disposed=").append(disposed);
2003:                sb.append(", numberOfConcurrentTransactedReceivers=").append(
2004:                        numberOfConcurrentTransactedReceivers);
2005:                sb.append(", createMultipleTransactedReceivers=").append(
2006:                        createMultipleTransactedReceivers);
2007:                sb.append(", connected=").append(connected);
2008:                sb.append(", supportedProtocols=").append(supportedProtocols);
2009:                sb.append(", serviceOverrides=").append(serviceOverrides);
2010:                sb.append('}');
2011:                return sb.toString();
2012:            }
2013:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.