001: /*
002: * $Id: AbstractConnectable.java 10489 2008-01-23 17:53:38Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport;
012:
013: import org.mule.api.MuleException;
014: import org.mule.api.MuleRuntimeException;
015: import org.mule.api.context.WorkManager;
016: import org.mule.api.endpoint.ImmutableEndpoint;
017: import org.mule.api.transport.Connectable;
018: import org.mule.api.transport.ConnectionStrategy;
019: import org.mule.api.transport.Connector;
020: import org.mule.config.i18n.CoreMessages;
021: import org.mule.context.notification.ConnectionNotification;
022: import org.mule.util.ClassUtils;
023:
024: import java.beans.ExceptionListener;
025:
026: import org.apache.commons.logging.Log;
027: import org.apache.commons.logging.LogFactory;
028:
029: /**
030: * Provide a default dispatch (client) support for handling threads lifecycle and validation.
031: */
032: public abstract class AbstractConnectable implements Connectable,
033: ExceptionListener {
034:
035: /**
036: * logger used by this class
037: */
038: protected transient Log logger = LogFactory.getLog(getClass());
039:
040: /**
041: * Thread pool of Connector sessions
042: */
043: protected WorkManager workManager = null;
044:
045: protected final ImmutableEndpoint endpoint;
046: protected final AbstractConnector connector;
047:
048: protected boolean disposed = false;
049:
050: protected ConnectionStrategy connectionStrategy;
051:
052: protected volatile boolean connecting = false;
053: protected volatile boolean connected = false;
054:
055: public AbstractConnectable(ImmutableEndpoint endpoint) {
056: this .endpoint = endpoint;
057: this .connector = (AbstractConnector) endpoint.getConnector();
058:
059: connectionStrategy = endpoint.getConnectionStrategy();
060: if (connectionStrategy instanceof AbstractConnectionStrategy) {
061: // We don't want to do threading in the dispatcher because we're either
062: // already running in a worker thread (asynchronous) or we need to
063: // complete the operation in a single thread
064: final AbstractConnectionStrategy connStrategy = (AbstractConnectionStrategy) connectionStrategy;
065: if (connStrategy.isDoThreading()) {
066: if (logger.isDebugEnabled()) {
067: logger.debug("Overriding doThreading to false on "
068: + connStrategy);
069: }
070: connStrategy.setDoThreading(false);
071: }
072: }
073:
074: if (isDoThreading()) {
075: try {
076: workManager = connector.getDispatcherWorkManager();
077: } catch (MuleException e) {
078: disposeAndLogException();
079: throw new MuleRuntimeException(CoreMessages
080: .failedToStart("WorkManager"), e);
081: }
082: }
083: }
084:
085: protected void disposeAndLogException() {
086: try {
087: dispose();
088: } catch (Throwable t) {
089: logger.error(
090: "Could not dispose of the message dispatcher!", t);
091: }
092: }
093:
094: /*
095: * (non-Javadoc)
096: *
097: * @see org.mule.util.ExceptionListener#onException(java.lang.Throwable)
098: */
099: public void exceptionThrown(Exception e) {
100: try {
101: getConnector().handleException(e);
102: } finally {
103: dispose();
104: }
105: }
106:
107: public boolean validate() {
108: // by default a dispatcher/requester can be used unless disposed
109: return !disposed;
110: }
111:
112: public void activate() {
113: // nothing to do by default
114: }
115:
116: public void passivate() {
117: // nothing to do by default
118: }
119:
120: /**
121: * Template method to destroy any resources held by the Message Dispatcher
122: */
123: public final synchronized void dispose() {
124: if (!disposed) {
125: try {
126: try {
127: this .disconnect();
128: } catch (Exception e) {
129: // TODO MULE-863: What should we really do?
130: logger.warn(e.getMessage(), e);
131: }
132:
133: this .doDispose();
134:
135: if (workManager != null) {
136: workManager.dispose();
137: }
138: } finally {
139: disposed = true;
140: }
141: }
142: }
143:
144: public Connector getConnector() {
145: return connector;
146: }
147:
148: public ImmutableEndpoint getEndpoint() {
149: return endpoint;
150: }
151:
152: public synchronized void connect() throws Exception {
153: if (disposed) {
154: throw new IllegalStateException(
155: "Requester/dispatcher has been disposed; cannot connect to resource");
156: }
157:
158: if (connected) {
159: return;
160: }
161:
162: if (!connecting) {
163: connecting = true;
164:
165: if (logger.isDebugEnabled()) {
166: logger.debug("Connecting: " + this );
167: }
168:
169: connectionStrategy.connect(this );
170:
171: logger.info("Connected: " + this );
172: return;
173: }
174:
175: try {
176: //Make sure the connector has connected. If it is connected, this method does nothing
177: connectionStrategy.connect(connector);
178:
179: this .doConnect();
180: connected = true;
181: connecting = false;
182:
183: connector.fireNotification(new ConnectionNotification(this ,
184: getConnectEventId(endpoint),
185: ConnectionNotification.CONNECTION_CONNECTED));
186: } catch (Exception e) {
187: connected = false;
188: connecting = false;
189:
190: connector.fireNotification(new ConnectionNotification(this ,
191: getConnectEventId(endpoint),
192: ConnectionNotification.CONNECTION_FAILED));
193:
194: if (e instanceof ConnectException) {
195: throw (ConnectException) e;
196: } else {
197: throw new ConnectException(e, this );
198: }
199: }
200: }
201:
202: public synchronized void disconnect() throws Exception {
203: if (!connected) {
204: return;
205: }
206:
207: if (logger.isDebugEnabled()) {
208: logger.debug("Disconnecting: " + this );
209: }
210:
211: this .doDisconnect();
212: connected = false;
213:
214: logger.info("Disconnected: " + this );
215:
216: connector.fireNotification(new ConnectionNotification(this ,
217: getConnectEventId(endpoint),
218: ConnectionNotification.CONNECTION_DISCONNECTED));
219: }
220:
221: protected String getConnectEventId(ImmutableEndpoint endpoint) {
222: return connector.getName() + ".dispatcher("
223: + endpoint.getEndpointURI().getUri() + ")";
224: }
225:
226: public final boolean isConnected() {
227: return connected;
228: }
229:
230: protected boolean isDoThreading() {
231: return connector.getDispatcherThreadingProfile()
232: .isDoThreading();
233: }
234:
235: /**
236: * Returns a string identifying the underlying resource
237: *
238: * @return
239: */
240: public String getConnectionDescription() {
241: return endpoint.getEndpointURI().toString();
242: }
243:
244: public synchronized void reconnect() throws Exception {
245: disconnect();
246: connect();
247: }
248:
249: protected abstract void doDispose();
250:
251: protected abstract void doConnect() throws Exception;
252:
253: protected abstract void doDisconnect() throws Exception;
254:
255: // @Override
256: public String toString() {
257: final StringBuffer sb = new StringBuffer(80);
258: sb.append(ClassUtils.getSimpleName(this .getClass()));
259: sb.append("{this=").append(
260: Integer.toHexString(System.identityHashCode(this )));
261: sb.append(", endpoint=").append(
262: endpoint.getEndpointURI().getUri());
263: sb.append(", disposed=").append(disposed);
264: sb.append('}');
265: return sb.toString();
266: }
267: }
|