001: /*
002: * $Id: DefaultMuleEvent.java 11035 2008-02-26 17:06:34Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule;
012:
013: import org.mule.api.DefaultMuleException;
014: import org.mule.api.MuleContext;
015: import org.mule.api.MuleEvent;
016: import org.mule.api.MuleException;
017: import org.mule.api.MuleMessage;
018: import org.mule.api.MuleSession;
019: import org.mule.api.ThreadSafeAccess;
020: import org.mule.api.config.MuleProperties;
021: import org.mule.api.endpoint.ImmutableEndpoint;
022: import org.mule.api.security.Credentials;
023: import org.mule.api.service.Service;
024: import org.mule.api.transformer.TransformerException;
025: import org.mule.api.transport.PropertyScope;
026: import org.mule.config.i18n.CoreMessages;
027: import org.mule.endpoint.DefaultEndpointFactory;
028: import org.mule.security.MuleCredentials;
029: import org.mule.util.MapUtils;
030: import org.mule.util.UUID;
031:
032: import java.io.IOException;
033: import java.io.ObjectInputStream;
034: import java.io.ObjectOutputStream;
035: import java.io.OutputStream;
036: import java.io.UnsupportedEncodingException;
037: import java.util.EventObject;
038: import java.util.Iterator;
039:
040: import org.apache.commons.beanutils.PropertyUtils;
041: import org.apache.commons.logging.Log;
042: import org.apache.commons.logging.LogFactory;
043:
044: /**
045: * <code>DefaultMuleEvent</code> represents any data event occuring in the Mule
046: * environment. All data sent or received within the Mule environment will be passed
047: * between components as an MuleEvent. <p/> The MuleEvent holds some data and provides
048: * helper methods for obtaining the data in a format that the receiving Mule UMO
049: * understands. The event can also maintain any number of properties that can be set
050: * and retrieved by Mule UMO components.
051: */
052:
053: public class DefaultMuleEvent extends EventObject implements MuleEvent,
054: ThreadSafeAccess {
055: /**
056: * Serial version
057: */
058: private static final long serialVersionUID = 1L;
059: /**
060: * logger used by this class
061: */
062: protected transient Log logger = LogFactory.getLog(getClass());
063: /**
064: * The endpoint associated with the event
065: */
066: private transient ImmutableEndpoint endpoint = null;
067:
068: /**
069: * the Universally Unique ID for the event
070: */
071: private String id = null;
072:
073: /**
074: * The payload message used to read the payload of the event
075: */
076: private MuleMessage message = null;
077:
078: private transient MuleSession session;
079:
080: private boolean stopFurtherProcessing = false;
081:
082: private boolean synchronous = false;
083:
084: private int timeout = TIMEOUT_NOT_SET_VALUE;
085:
086: private transient ResponseOutputStream outputStream = null;
087:
088: private transient Object transformedMessage = null;
089:
090: private Credentials credentials = null;
091:
092: protected String[] ignoredPropertyOverrides = new String[] { MuleProperties.MULE_METHOD_PROPERTY };
093:
094: /**
095: * Properties cache that only reads properties once from the inbound message and
096: * merges them with any properties on the endpoint. The message properties take
097: * precedence over the endpoint properties
098: */
099: public DefaultMuleEvent(MuleMessage message,
100: ImmutableEndpoint endpoint, Service service,
101: MuleEvent previousEvent) {
102: super (message.getPayload());
103: this .message = message;
104: this .id = generateEventId();
105: this .session = previousEvent.getSession();
106: ((DefaultMuleSession) session).setService(service);
107: this .endpoint = endpoint;
108: this .synchronous = previousEvent.isSynchronous();
109: this .timeout = previousEvent.getTimeout();
110: this .outputStream = (ResponseOutputStream) previousEvent
111: .getOutputStream();
112: fillProperties(previousEvent);
113: }
114:
115: public DefaultMuleEvent(MuleMessage message,
116: ImmutableEndpoint endpoint, MuleSession session,
117: boolean synchronous) {
118: this (message, endpoint, session, synchronous, null);
119: }
120:
121: /**
122: * Contructor.
123: *
124: * @param message the event payload
125: * @param endpoint the endpoint to associate with the event
126: * @param session the previous event if any
127: * @see org.mule.api.transport.MessageAdapter
128: */
129: public DefaultMuleEvent(MuleMessage message,
130: ImmutableEndpoint endpoint, MuleSession session,
131: boolean synchronous, ResponseOutputStream outputStream) {
132: super (message.getPayload());
133: this .message = message;
134: this .endpoint = endpoint;
135: this .session = session;
136: this .id = generateEventId();
137: this .synchronous = synchronous;
138: this .outputStream = outputStream;
139: fillProperties(null);
140: }
141:
142: /**
143: * Contructor.
144: *
145: * @param message the event payload
146: * @param endpoint the endpoint to associate with the event
147: * @param session the previous event if any
148: * @see org.mule.api.transport.MessageAdapter
149: */
150: public DefaultMuleEvent(MuleMessage message,
151: ImmutableEndpoint endpoint, MuleSession session,
152: String eventId, boolean synchronous) {
153: super (message.getPayload());
154: this .message = message;
155: this .endpoint = endpoint;
156: this .session = session;
157: this .id = eventId;
158: this .synchronous = synchronous;
159: fillProperties(null);
160: }
161:
162: /**
163: * A helper constructor used to rewrite an event payload
164: *
165: * @param message
166: * @param rewriteEvent
167: */
168: public DefaultMuleEvent(MuleMessage message, MuleEvent rewriteEvent) {
169: super (message.getPayload());
170: this .message = message;
171: this .id = rewriteEvent.getId();
172: this .session = rewriteEvent.getSession();
173: ((DefaultMuleSession) session).setService(rewriteEvent
174: .getService());
175: this .endpoint = rewriteEvent.getEndpoint();
176: this .synchronous = rewriteEvent.isSynchronous();
177: this .timeout = rewriteEvent.getTimeout();
178: this .outputStream = (ResponseOutputStream) rewriteEvent
179: .getOutputStream();
180: if (rewriteEvent instanceof DefaultMuleEvent) {
181: this .transformedMessage = ((DefaultMuleEvent) rewriteEvent)
182: .getCachedMessage();
183: }
184: fillProperties(rewriteEvent);
185: }
186:
187: protected void fillProperties(MuleEvent previousEvent) {
188: if (previousEvent != null) {
189: MuleMessage msg = previousEvent.getMessage();
190: synchronized (msg) {
191: for (Iterator iterator = msg.getPropertyNames()
192: .iterator(); iterator.hasNext();) {
193: String prop = (String) iterator.next();
194: Object value = msg.getProperty(prop);
195: // don't overwrite property on the message
196: if (!ignoreProperty(prop)) {
197: message.setProperty(prop, value);
198: }
199: }
200: }
201: }
202:
203: if (endpoint != null && endpoint.getProperties() != null) {
204: for (Iterator iterator = endpoint.getProperties().keySet()
205: .iterator(); iterator.hasNext();) {
206: String prop = (String) iterator.next();
207: Object value = endpoint.getProperties().get(prop);
208: // don't overwrite property on the message
209: if (!ignoreProperty(prop)) {
210: message.setProperty(prop, value,
211: PropertyScope.INVOCATION);
212: }
213: }
214: }
215:
216: setCredentials();
217: }
218:
219: /**
220: * This method is used to determine if a property on the previous event should be
221: * ignorred for the next event. This method is here because we don't have proper
222: * scoped handlng of meta data yet The rules are
223: * <ol>
224: * <li>If a property is already set on the currect event don't overwrite with the previous event value
225: * <li>If the propery name appears in the ignorredPropertyOverrides list, then we always set it on the new event
226: * </ol>
227: *
228: * @param key
229: * @return
230: */
231: protected boolean ignoreProperty(String key) {
232: if (key == null) {
233: return true;
234: }
235:
236: for (int i = 0; i < ignoredPropertyOverrides.length; i++) {
237: if (key.equals(ignoredPropertyOverrides[i])) {
238: return false;
239: }
240: }
241:
242: return null != message.getProperty(key);
243: }
244:
245: protected void setCredentials() {
246: if (null != endpoint && null != endpoint.getEndpointURI()
247: && null != endpoint.getEndpointURI().getUserInfo()) {
248: final String userName = endpoint.getEndpointURI().getUser();
249: final String password = endpoint.getEndpointURI()
250: .getPassword();
251: if (password != null && userName != null) {
252: credentials = new MuleCredentials(userName, password
253: .toCharArray());
254: }
255: }
256: }
257:
258: public Credentials getCredentials() {
259: return credentials;
260: }
261:
262: Object getCachedMessage() {
263: return transformedMessage;
264: }
265:
266: public MuleMessage getMessage() {
267: return message;
268: }
269:
270: public byte[] getMessageAsBytes() throws DefaultMuleException {
271: try {
272: return message.getPayloadAsBytes();
273: } catch (Exception e) {
274: throw new DefaultMuleException(CoreMessages
275: .cannotReadPayloadAsBytes(message.getPayload()
276: .getClass().getName()), e);
277: }
278: }
279:
280: public Object transformMessage() throws TransformerException {
281: return transformMessage(null);
282: }
283:
284: public Object transformMessage(Class outputType)
285: throws TransformerException {
286: message.applyTransformers(endpoint.getTransformers());
287: if (outputType == null) {
288: return message.getPayload();
289: } else {
290: return message.getPayload(outputType);
291: }
292: }
293:
294: /**
295: * This method will attempt to convert the transformed message into an array of
296: * bytes It will first check if the result of the transformation is a byte array
297: * and return that. Otherwise if the the result is a string it will serialized
298: * the CONTENTS of the string not the String object. finally it will check if the
299: * result is a Serializable object and convert that to an array of bytes.
300: *
301: * @return a byte[] representation of the message
302: * @throws TransformerException if an unsupported encoding is being used or if
303: * the result message is not a String byte[] or Seializable object
304: */
305: public byte[] transformMessageToBytes() throws TransformerException {
306: Object obj = transformMessage(byte[].class);
307: return (byte[]) obj;
308: }
309:
310: /**
311: * Returns the message transformed into it's recognised or expected format and
312: * then into a String. The transformer used is the one configured on the endpoint
313: * through which this event was received.
314: *
315: * @return the message transformed into it's recognised or expected format as a
316: * Strings.
317: * @throws org.mule.api.transformer.TransformerException if a failure occurs in
318: * the transformer
319: * @see org.mule.api.transformer.Transformer
320: */
321: public String transformMessageToString()
322: throws TransformerException {
323: try {
324: return new String(transformMessageToBytes(), getEncoding());
325: } catch (UnsupportedEncodingException e) {
326: throw new TransformerException(endpoint.getTransformers(),
327: e);
328: }
329:
330: /* TODO MULE-2691 Note that the above code actually transforms the message
331: * to byte[] instead of String. The following code would transform the
332: * message to a String but breaks some tests in transports/http:
333: *
334: transformMessageToBytes();
335:
336: ByteArrayToObject t = new ByteArrayToObject();
337: t.setEncoding(getEncoding());
338: List list = new ArrayList();
339: list.add(t);
340: message.applyTransformers(list);
341:
342: return (String) message.getPayload();
343: */
344: }
345:
346: public String getMessageAsString() throws MuleException {
347: return getMessageAsString(getEncoding());
348: }
349:
350: /**
351: * Returns the message contents as a string
352: *
353: * @param encoding the encoding to use when converting the message to string
354: * @return the message contents as a string
355: * @throws org.mule.api.MuleException if the message cannot be converted into a
356: * string
357: */
358: public String getMessageAsString(String encoding)
359: throws MuleException {
360: try {
361: return message.getPayloadAsString(encoding);
362: } catch (Exception e) {
363: throw new DefaultMuleException(CoreMessages
364: .cannotReadPayloadAsString(message.getClass()
365: .getName()), e);
366: }
367: }
368:
369: /*
370: * (non-Javadoc)
371: *
372: * @see org.mule.api.MuleEvent#getId()
373: */
374: public String getId() {
375: return id;
376: }
377:
378: /**
379: * @see org.mule.api.MuleEvent#getProperty(java.lang.String, boolean)
380: */
381: public Object getProperty(String name, boolean exhaustiveSearch) {
382: return getProperty(name, /* defaultValue */null,
383: exhaustiveSearch);
384: }
385:
386: /*
387: * (non-Javadoc)
388: *
389: * @see org.mule.api.MuleEvent#getProperty(java.lang.String, java.lang.Object,
390: * boolean)
391: */
392: public Object getProperty(String name, Object defaultValue,
393: boolean exhaustiveSearch) {
394: Object property = message.getProperty(name);
395:
396: if (exhaustiveSearch) {
397: // Search the endpoint
398: if (property == null) {
399: property = MapUtils.getObject(getEndpoint()
400: .getEndpointURI().getParams(), name, null);
401: }
402:
403: // Search the connector
404: if (property == null) {
405: try {
406: property = PropertyUtils.getProperty(getEndpoint()
407: .getConnector(), name);
408: } catch (Exception e) {
409: // Ignore this exception, it just means that the connector has no
410: // such property.
411: }
412: }
413: }
414: return (property == null ? defaultValue : property);
415: }
416:
417: /*
418: * (non-Javadoc)
419: *
420: * @see org.mule.api.MuleEvent#getEndpoint()
421: */
422: public ImmutableEndpoint getEndpoint() {
423: return endpoint;
424: }
425:
426: /*
427: * (non-Javadoc)
428: *
429: * @see java.lang.Object#toString()
430: */
431: public String toString() {
432: StringBuffer buf = new StringBuffer(64);
433: buf.append("MuleEvent: ").append(getId());
434: buf.append(", sync=").append(isSynchronous());
435: buf.append(", stop processing=").append(
436: isStopFurtherProcessing());
437: buf.append(", ").append(endpoint);
438:
439: return buf.toString();
440: }
441:
442: protected String generateEventId() {
443: return UUID.getUUID();
444: }
445:
446: public MuleSession getSession() {
447: return session;
448: }
449:
450: void setSession(MuleSession session) {
451: this .session = session;
452: }
453:
454: /**
455: * Gets the recipient service of this event
456: */
457: public Service getService() {
458: return session.getService();
459: }
460:
461: /**
462: * Determines whether the default processing for this event will be executed
463: *
464: * @return Returns the stopFurtherProcessing.
465: */
466: public boolean isStopFurtherProcessing() {
467: return stopFurtherProcessing;
468: }
469:
470: /**
471: * Setting this parameter will stop the Mule framework from processing this event
472: * in the standard way. This allow for client code to override default behaviour.
473: * The common reasons for doing this are - 1. The UMO has more than one send
474: * endpoint configured; the service must dispatch to other prviders
475: * programatically by using the service on the current event 2. The UMO doesn't
476: * send the current event out through a endpoint. i.e. the processing of the
477: * event stops in the uMO.
478: *
479: * @param stopFurtherProcessing The stopFurtherProcessing to set.
480: */
481: public void setStopFurtherProcessing(boolean stopFurtherProcessing) {
482: this .stopFurtherProcessing = stopFurtherProcessing;
483: }
484:
485: public boolean equals(Object o) {
486: if (this == o) {
487: return true;
488: }
489: if (!(o instanceof DefaultMuleEvent)) {
490: return false;
491: }
492:
493: final DefaultMuleEvent event = (DefaultMuleEvent) o;
494:
495: if (message != null ? !message.equals(event.message)
496: : event.message != null) {
497: return false;
498: }
499: return id.equals(event.id);
500: }
501:
502: public int hashCode() {
503: return 29 * id.hashCode()
504: + (message != null ? message.hashCode() : 0);
505: }
506:
507: public boolean isSynchronous() {
508: return synchronous;
509: }
510:
511: public void setSynchronous(boolean value) {
512: synchronous = value;
513: }
514:
515: public int getTimeout() {
516: if (timeout == TIMEOUT_NOT_SET_VALUE) {
517: // If this is not set it will use the default timeout value
518: timeout = endpoint.getRemoteSyncTimeout();
519: }
520: return timeout;
521: }
522:
523: public void setTimeout(int timeout) {
524: this .timeout = timeout;
525: }
526:
527: /**
528: * An outputstream can optionally be used to write response data to an incoming
529: * message.
530: *
531: * @return an output strem if one has been made available by the message receiver
532: * that received the message
533: */
534: public OutputStream getOutputStream() {
535: return outputStream;
536: }
537:
538: private void writeObject(ObjectOutputStream out) throws IOException {
539: out.defaultWriteObject();
540: out.writeInt(endpoint.hashCode());
541: }
542:
543: private void readObject(ObjectInputStream in) throws IOException,
544: ClassNotFoundException {
545: logger = LogFactory.getLog(getClass());
546: in.defaultReadObject();
547: int hashCode = in.readInt();
548: endpoint = (ImmutableEndpoint) RegistryContext.getRegistry()
549: .lookupObject(
550: DefaultEndpointFactory.ENDPOINT_REGISTRY_PREFIX
551: + hashCode);
552: }
553:
554: /**
555: * Gets the encoding for this message. First it looks to see if encoding has been
556: * set on the endpoint, if not it will check the message itself and finally it
557: * will fall back to the Mule global configuration for encoding which cannot be
558: * null.
559: *
560: * @return the encoding for the event
561: */
562: public String getEncoding() {
563: String encoding = message.getEncoding();
564: if (encoding == null) {
565: encoding = endpoint.getEncoding();
566: }
567:
568: return encoding;
569: }
570:
571: public MuleContext getMuleContext() {
572: return endpoint.getMuleContext();
573: }
574:
575: public ThreadSafeAccess newThreadCopy() {
576: if (message instanceof ThreadSafeAccess) {
577: DefaultMuleEvent copy = new DefaultMuleEvent(
578: (MuleMessage) ((ThreadSafeAccess) message)
579: .newThreadCopy(), this );
580: copy.resetAccessControl();
581: return copy;
582: } else {
583: return this ;
584: }
585: }
586:
587: public void resetAccessControl() {
588: if (message instanceof ThreadSafeAccess) {
589: ((ThreadSafeAccess) message).resetAccessControl();
590: }
591: }
592:
593: public void assertAccess(boolean write) {
594: if (message instanceof ThreadSafeAccess) {
595: ((ThreadSafeAccess) message).assertAccess(write);
596: }
597: }
598:
599: }
|