001: /*
002: * $Id: AbstractMessageDispatcher.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport;
012:
013: import org.mule.OptimizedRequestContext;
014: import org.mule.RequestContext;
015: import org.mule.api.MuleEvent;
016: import org.mule.api.MuleException;
017: import org.mule.api.MuleMessage;
018: import org.mule.api.config.MuleProperties;
019: import org.mule.api.endpoint.ImmutableEndpoint;
020: import org.mule.api.endpoint.OutboundEndpoint;
021: import org.mule.api.routing.ResponseRouterCollection;
022: import org.mule.api.transaction.Transaction;
023: import org.mule.api.transaction.TransactionException;
024: import org.mule.api.transport.DispatchException;
025: import org.mule.api.transport.MessageDispatcher;
026: import org.mule.context.notification.MessageNotification;
027: import org.mule.context.notification.SecurityNotification;
028: import org.mule.transaction.TransactionCoordination;
029:
030: import javax.resource.spi.work.Work;
031: import javax.resource.spi.work.WorkManager;
032:
033: /**
034: * Provide a default dispatch (client) support for handling threads lifecycle and validation.
035: */
036: public abstract class AbstractMessageDispatcher extends
037: AbstractConnectable implements MessageDispatcher {
038:
039: public AbstractMessageDispatcher(OutboundEndpoint endpoint) {
040: super (endpoint);
041: }
042:
043: /*
044: * (non-Javadoc)
045: *
046: * @see org.mule.api.transport.MessageDispatcher#dispatch(org.mule.api.MuleEvent)
047: */
048: public final void dispatch(MuleEvent event)
049: throws DispatchException {
050: event.setSynchronous(false);
051: event.getMessage().setProperty(
052: MuleProperties.MULE_ENDPOINT_PROPERTY,
053: event.getEndpoint().getEndpointURI().toString());
054: event = OptimizedRequestContext.criticalSetEvent(event); // MULE-2112
055:
056: // Apply Security filter if one is set
057: ImmutableEndpoint endpoint = event.getEndpoint();
058: if (endpoint.getSecurityFilter() != null) {
059: try {
060: endpoint.getSecurityFilter().authenticate(event);
061: } catch (org.mule.api.security.SecurityException e) {
062: // TODO MULE-863: Do we need this warning?
063: logger.warn(
064: "Outbound Request was made but was not authenticated: "
065: + e.getMessage(), e);
066: connector
067: .fireNotification(new SecurityNotification(
068: e,
069: SecurityNotification.ADMIN_EVENT_ACTION_START_RANGE));
070: connector.handleException(e);
071: return;
072: } catch (MuleException e) {
073: disposeAndLogException();
074: throw new DispatchException(event.getMessage(), event
075: .getEndpoint(), e);
076: }
077: }
078:
079: try {
080: Transaction tx = TransactionCoordination.getInstance()
081: .getTransaction();
082: if (isDoThreading() && !event.isSynchronous() && tx == null) {
083: workManager.scheduleWork(new Worker(event),
084: WorkManager.INDEFINITE, null, connector);
085: } else {
086: // Make sure we are connected
087: connectionStrategy.connect(this );
088: doDispatch(event);
089: if (connector.isEnableMessageEvents()) {
090: String component = null;
091: if (event.getService() != null) {
092: component = event.getService().getName();
093: }
094: connector.fireNotification(new MessageNotification(
095: event.getMessage(), event.getEndpoint(),
096: component,
097: MessageNotification.MESSAGE_DISPATCHED));
098: }
099: }
100: } catch (DispatchException e) {
101: disposeAndLogException();
102: throw e;
103: } catch (Exception e) {
104: disposeAndLogException();
105: throw new DispatchException(event.getMessage(), event
106: .getEndpoint(), e);
107: }
108: }
109:
110: public final MuleMessage send(MuleEvent event)
111: throws DispatchException {
112: // No point continuing if the service has rolledback the transaction
113: if (isTransactionRollback()) {
114: return event.getMessage();
115: }
116:
117: event.setSynchronous(true);
118: event.getMessage().setProperty(
119: MuleProperties.MULE_ENDPOINT_PROPERTY,
120: event.getEndpoint().getEndpointURI().getUri()
121: .toString());
122: event = OptimizedRequestContext.unsafeSetEvent(event);
123:
124: // Apply Security filter if one is set
125: ImmutableEndpoint endpoint = event.getEndpoint();
126: if (endpoint.getSecurityFilter() != null) {
127: try {
128: endpoint.getSecurityFilter().authenticate(event);
129: } catch (org.mule.api.security.SecurityException e) {
130: logger.warn(
131: "Outbound Request was made but was not authenticated: "
132: + e.getMessage(), e);
133: connector
134: .fireNotification(new SecurityNotification(
135: e,
136: SecurityNotification.SECURITY_AUTHENTICATION_FAILED));
137: connector.handleException(e);
138: return event.getMessage();
139: } catch (MuleException e) {
140: disposeAndLogException();
141: throw new DispatchException(event.getMessage(), event
142: .getEndpoint(), e);
143: }
144: }
145:
146: try {
147: // Make sure we are connected
148: connectionStrategy.connect(this );
149:
150: MuleMessage result = doSend(event);
151: if (connector.isEnableMessageEvents()) {
152: String component = null;
153: if (event.getService() != null) {
154: component = event.getService().getName();
155: }
156: connector.fireNotification(new MessageNotification(
157: event.getMessage(), event.getEndpoint(),
158: component, MessageNotification.MESSAGE_SENT));
159: }
160:
161: // Once a dispatcher has done its work we need to remove this property
162: // so that it is not propagated to the next request
163: if (result != null
164: && result.getPropertyNames().contains(
165: MuleProperties.MULE_REMOTE_SYNC_PROPERTY)) {
166: // result = RequestContext.safeMessageCopy(result);
167: result
168: .removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY);
169: }
170: return result;
171: } catch (DispatchException e) {
172: disposeAndLogException();
173: throw e;
174: } catch (Exception e) {
175: disposeAndLogException();
176: throw new DispatchException(event.getMessage(), event
177: .getEndpoint(), e);
178: }
179: }
180:
181: /**
182: * RemoteSync causes the message dispatch to wait for a response to an event on a
183: * response channel after it sends the event. The following rules apply to
184: * RemoteSync 1. The connector has to support remoteSync. Some transports do not
185: * have the notion of a response channel 2. Check if the endpoint has been
186: * configured for remoteSync 3. Check if the REMOTE_SYNC message header has been
187: * set 4. Finally, if the current service has a response router configured,
188: * that the router will handle the response channel event and we should not try
189: * and receive a response in the Message dispatcher If remotesync should not be
190: * used we must remove the REMOTE_SYNC header Note the MuleClient will
191: * automatically set the REMOTE_SYNC header when client.send(..) is called so
192: * that results are returned from remote invocations too.
193: *
194: * @param event the current event
195: * @return true if a response channel should be used to get a resposne from the
196: * event dispatch.
197: */
198: protected boolean useRemoteSync(MuleEvent event) {
199: boolean remoteSync = false;
200: if (event.getEndpoint().getConnector().isRemoteSyncEnabled()) {
201: remoteSync = event.getEndpoint().isRemoteSync()
202: || event.getMessage().getBooleanProperty(
203: MuleProperties.MULE_REMOTE_SYNC_PROPERTY,
204: false);
205: if (remoteSync) {
206: // service will be null for client calls
207: if (event.getService() != null) {
208: ResponseRouterCollection responseRouters = event
209: .getService().getResponseRouter();
210: if (responseRouters != null
211: && responseRouters.hasEndpoints()) {
212: remoteSync = false;
213: } else {
214: remoteSync = true;
215: }
216: }
217: }
218: }
219: if (!remoteSync) {
220: event.getMessage().removeProperty(
221: MuleProperties.MULE_REMOTE_SYNC_PROPERTY);
222: }
223: return remoteSync;
224: }
225:
226: private class Worker implements Work {
227: private MuleEvent event;
228:
229: public Worker(MuleEvent event) {
230: this .event = event;
231: }
232:
233: /*
234: * (non-Javadoc)
235: *
236: * @see java.lang.Runnable#run()
237: */
238: public void run() {
239: try {
240: event = RequestContext.setEvent(event);
241: // Make sure we are connected
242: connectionStrategy
243: .connect(AbstractMessageDispatcher.this );
244: AbstractMessageDispatcher.this .doDispatch(event);
245:
246: if (connector.isEnableMessageEvents()) {
247: String component = null;
248: if (event.getService() != null) {
249: component = event.getService().getName();
250: }
251:
252: connector.fireNotification(new MessageNotification(
253: event.getMessage(), event.getEndpoint(),
254: component,
255: MessageNotification.MESSAGE_DISPATCHED));
256: }
257: } catch (Exception e) {
258: AbstractMessageDispatcher.this .getConnector()
259: .handleException(e);
260: }
261: }
262:
263: public void release() {
264: // nothing to do
265: }
266: }
267:
268: /**
269: * Checks to see if the current transaction has been rolled back
270: *
271: * @return
272: */
273: protected boolean isTransactionRollback() {
274: try {
275: Transaction tx = TransactionCoordination.getInstance()
276: .getTransaction();
277: if (tx != null && tx.isRollbackOnly()) {
278: return true;
279: }
280: } catch (TransactionException e) {
281: // TODO MULE-863: What should we really do?
282: logger.warn(e.getMessage());
283: }
284: return false;
285: }
286:
287: protected abstract void doDispatch(MuleEvent event)
288: throws Exception;
289:
290: protected abstract MuleMessage doSend(MuleEvent event)
291: throws Exception;
292:
293: }
|