001: /*
002: * $Id: RemoteDispatcherComponent.java 11376 2008-03-16 17:44:10Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.module.client.remoting;
012:
013: import org.mule.DefaultMuleEvent;
014: import org.mule.DefaultMuleMessage;
015: import org.mule.DefaultMuleSession;
016: import org.mule.MuleServer;
017: import org.mule.RequestContext;
018: import org.mule.api.DefaultMuleException;
019: import org.mule.api.MuleContext;
020: import org.mule.api.MuleEvent;
021: import org.mule.api.MuleEventContext;
022: import org.mule.api.MuleException;
023: import org.mule.api.MuleMessage;
024: import org.mule.api.MuleSession;
025: import org.mule.api.config.MuleProperties;
026: import org.mule.api.endpoint.EndpointBuilder;
027: import org.mule.api.endpoint.EndpointFactory;
028: import org.mule.api.endpoint.ImmutableEndpoint;
029: import org.mule.api.endpoint.InboundEndpoint;
030: import org.mule.api.endpoint.OutboundEndpoint;
031: import org.mule.api.lifecycle.Callable;
032: import org.mule.api.lifecycle.Initialisable;
033: import org.mule.api.lifecycle.InitialisationException;
034: import org.mule.api.lifecycle.LifecycleTransitionResult;
035: import org.mule.api.service.Service;
036: import org.mule.api.transformer.TransformerException;
037: import org.mule.api.transformer.wire.WireFormat;
038: import org.mule.component.SimpleCallableJavaComponent;
039: import org.mule.config.i18n.CoreMessages;
040: import org.mule.endpoint.EndpointURIEndpointBuilder;
041: import org.mule.message.DefaultExceptionPayload;
042: import org.mule.model.seda.SedaService;
043: import org.mule.module.client.remoting.notification.RemoteDispatcherNotification;
044: import org.mule.object.PrototypeObjectFactory;
045: import org.mule.transport.AbstractConnector;
046: import org.mule.transport.NullPayload;
047: import org.mule.util.MapUtils;
048:
049: import java.io.ByteArrayInputStream;
050: import java.util.HashMap;
051: import java.util.LinkedList;
052: import java.util.List;
053: import java.util.Map;
054:
055: import org.apache.commons.io.output.ByteArrayOutputStream;
056: import org.apache.commons.logging.Log;
057: import org.apache.commons.logging.LogFactory;
058:
059: /**
060: * <code>RemoteDispatcherComponent</code> is a MuleManager interal server component
061: * responsible for receiving remote requests and dispatching them locally. This
062: * allows developer to tunnel requests through http ssl to a Mule instance behind a
063: * firewall
064: */
065:
066: public class RemoteDispatcherComponent implements Callable,
067: Initialisable {
068: /**
069: * logger used by this class
070: */
071: protected static final Log logger = LogFactory
072: .getLog(RemoteDispatcherComponent.class);
073:
074: public static final String MANAGER_COMPONENT_NAME = "_muleManagerComponent";
075:
076: /**
077: * Use Serialization by default
078: */
079: protected WireFormat wireFormat;
080:
081: protected String encoding;
082:
083: protected int synchronousEventTimeout = 5000;
084:
085: public LifecycleTransitionResult initialise()
086: throws InitialisationException {
087: if (wireFormat == null) {
088: throw new InitialisationException(CoreMessages
089: .objectIsNull("wireFormat"), this );
090: }
091: return LifecycleTransitionResult.OK;
092: }
093:
094: public Object onCall(MuleEventContext context) throws Exception {
095: if (context.transformMessageToString().equals(
096: ServerHandshake.SERVER_HANDSHAKE_PROPERTY)) {
097: return doHandshake(context);
098: }
099:
100: Object result;
101: logger.debug("Message received by RemoteDispatcherComponent");
102: ByteArrayInputStream in = new ByteArrayInputStream(context
103: .transformMessageToBytes());
104: RemoteDispatcherNotification action = (RemoteDispatcherNotification) ((MuleMessage) wireFormat
105: .read(in)).getPayload();
106:
107: if (RemoteDispatcherNotification.ACTION_INVOKE == action
108: .getAction()) {
109: result = invokeAction(action, context);
110: } else if (RemoteDispatcherNotification.ACTION_SEND == action
111: .getAction()
112: || RemoteDispatcherNotification.ACTION_DISPATCH == action
113: .getAction()) {
114: result = sendAction(action, context);
115: } else if (RemoteDispatcherNotification.ACTION_RECEIVE == action
116: .getAction()) {
117: result = receiveAction(action, context);
118: } else {
119: result = handleException(
120: null,
121: new DefaultMuleException(
122: CoreMessages
123: .eventTypeNotRecognised("RemoteDispatcherNotification:"
124: + action.getAction())));
125: }
126: return result;
127: }
128:
129: protected ServerHandshake doHandshake(MuleEventContext context)
130: throws TransformerException {
131: ServerHandshake handshake = new ServerHandshake();
132: handshake.setWireFormatClass(wireFormat.getClass().getName());
133: return handshake;
134: }
135:
136: protected Object invokeAction(RemoteDispatcherNotification action,
137: MuleEventContext context) throws MuleException {
138: String destComponent;
139: MuleMessage result = null;
140: String endpoint = action.getResourceIdentifier();
141: if (action.getResourceIdentifier().startsWith("mule:")) {
142: destComponent = endpoint.substring(endpoint
143: .lastIndexOf("/") + 1);
144: } else {
145: destComponent = endpoint;
146: }
147:
148: if (destComponent != null) {
149: MuleSession session = new DefaultMuleSession(context
150: .getMuleContext().getRegistry().lookupService(
151: destComponent), context.getMuleContext());
152: // Need to do this otherise when the event is invoked the
153: // transformer associated with the Mule Admin queue will be invoked, but
154: // the message will not be of expected type
155: MuleContext managementContext = MuleServer.getMuleContext();
156: EndpointBuilder builder = new EndpointURIEndpointBuilder(
157: RequestContext.getEvent().getEndpoint(),
158: managementContext);
159: // TODO - is this correct? it stops any other transformer from being set
160: builder.setTransformers(new LinkedList());
161: ImmutableEndpoint ep = managementContext.getRegistry()
162: .lookupEndpointFactory()
163: .getInboundEndpoint(builder);
164: MuleEvent event = new DefaultMuleEvent(action.getMessage(),
165: ep, context.getSession(), context.isSynchronous());
166: event = RequestContext.setEvent(event);
167:
168: if (context.isSynchronous()) {
169: result = session.getService().sendEvent(event);
170: ByteArrayOutputStream out = new ByteArrayOutputStream();
171: wireFormat.write(out, result, getEncoding());
172: return out.toByteArray();
173: } else {
174: session.getService().dispatchEvent(event);
175: return null;
176: }
177: } else {
178: return handleException(
179: result,
180: new DefaultMuleException(
181: CoreMessages
182: .couldNotDetermineDestinationComponentFromEndpoint(endpoint)));
183: }
184: }
185:
186: protected Object sendAction(RemoteDispatcherNotification action,
187: MuleEventContext context) throws MuleException {
188: MuleMessage result = null;
189: OutboundEndpoint endpoint = null;
190: MuleContext managementContext = context.getMuleContext();
191: try {
192: if (RemoteDispatcherNotification.ACTION_DISPATCH == action
193: .getAction()) {
194: endpoint = managementContext.getRegistry()
195: .lookupEndpointFactory().getOutboundEndpoint(
196: action.getResourceIdentifier());
197: context.dispatchEvent(action.getMessage(), endpoint);
198: return null;
199: } else {
200: EndpointFactory endpointFactory = managementContext
201: .getRegistry().lookupEndpointFactory();
202: EndpointBuilder endpointBuilder = endpointFactory
203: .getEndpointBuilder(action
204: .getResourceIdentifier());
205: endpointBuilder.setRemoteSync(true);
206: endpoint = managementContext.getRegistry()
207: .lookupEndpointFactory().getOutboundEndpoint(
208: endpointBuilder);
209: result = context.sendEvent(action.getMessage(),
210: endpoint);
211: if (result == null) {
212: return null;
213: } else {
214: ByteArrayOutputStream out = new ByteArrayOutputStream();
215: wireFormat.write(out, result, getEncoding());
216: return out.toByteArray();
217: }
218: }
219: } catch (Exception e) {
220: return handleException(result, e);
221: }
222: }
223:
224: protected Object receiveAction(RemoteDispatcherNotification action,
225: MuleEventContext context) throws MuleException {
226: MuleMessage result = null;
227: try {
228: ImmutableEndpoint endpoint = context
229: .getMuleContext()
230: .getRegistry()
231: .lookupEndpointFactory()
232: .getOutboundEndpoint(action.getResourceIdentifier());
233:
234: long timeout = MapUtils.getLongValue(
235: action.getProperties(),
236: MuleProperties.MULE_EVENT_TIMEOUT_PROPERTY,
237: getSynchronousEventTimeout());
238:
239: result = endpoint.getConnector().request(
240: action.getResourceIdentifier(), timeout);
241: if (result != null) {
242: // See if there is a default transformer on the connector
243: List transformers = ((AbstractConnector) endpoint
244: .getConnector())
245: .getDefaultInboundTransformers();
246: if (transformers != null) {
247: result.applyTransformers(transformers);
248: }
249: ByteArrayOutputStream out = new ByteArrayOutputStream();
250: wireFormat.write(out, result, getEncoding());
251: return out.toByteArray();
252: } else {
253: return null;
254: }
255: } catch (Exception e) {
256: return handleException(result, e);
257: }
258:
259: }
260:
261: public static final Service getSerivce(InboundEndpoint endpoint,
262: WireFormat wireFormat, String encoding, int eventTimeout,
263: MuleContext managementContext) throws MuleException {
264: try {
265: Service service = new SedaService();
266: service.setName(MANAGER_COMPONENT_NAME);
267: service.setModel(managementContext.getRegistry()
268: .lookupSystemModel());
269:
270: Map props = new HashMap();
271: props.put("wireFormat", wireFormat);
272: props.put("encoding", encoding);
273: props.put("synchronousEventTimeout", new Integer(
274: eventTimeout));
275: service.setComponent(new SimpleCallableJavaComponent(
276: new PrototypeObjectFactory(
277: RemoteDispatcherComponent.class, props)));
278:
279: service.setMuleContext(managementContext);
280: service.getInboundRouter().addEndpoint(endpoint);
281:
282: return service;
283: } catch (Exception e) {
284: throw new InitialisationException(e, null);
285: }
286: }
287:
288: /**
289: * Wraps an exception into a MuleMessage with an Exception payload and returns
290: * the Xml representation of it
291: *
292: * @param result the result of the invocation or null if the exception occurred
293: * before or during the invocation
294: * @param e the Exception thrown
295: * @return an Xml String message result
296: */
297: protected Object handleException(MuleMessage result, Throwable e) {
298: logger.error("Failed to process admin request: "
299: + e.getMessage(), e);
300: if (result == null) {
301: result = new DefaultMuleMessage(NullPayload.getInstance(),
302: (Map) null);
303: }
304: result.setExceptionPayload(new DefaultExceptionPayload(e));
305: try {
306: ByteArrayOutputStream out = new ByteArrayOutputStream();
307: wireFormat.write(out, result, getEncoding());
308: return out.toByteArray();
309: } catch (Exception e1) {
310: // TODO MULE-863: Is this sufficient?
311: // log the inner exception here since the earlier exception was logged earlier
312: logger
313: .error("Failed to format message, using direct string (details at debug level): "
314: + e1.getMessage());
315: logger.debug(e1.toString(), e1);
316: return e.getMessage();
317: }
318: }
319:
320: public WireFormat getWireFormat() {
321: return wireFormat;
322: }
323:
324: public void setWireFormat(WireFormat wireFormat) {
325: this .wireFormat = wireFormat;
326: }
327:
328: public String getEncoding() {
329: return encoding;
330: }
331:
332: public void setEncoding(String encoding) {
333: this .encoding = encoding;
334: }
335:
336: public int getSynchronousEventTimeout() {
337: return synchronousEventTimeout;
338: }
339:
340: public void setSynchronousEventTimeout(int synchronousEventTimeout) {
341: this.synchronousEventTimeout = synchronousEventTimeout;
342: }
343: }
|