001: /*
002: * $Id: DefaultMuleMessage.java 11343 2008-03-13 10:58:26Z tcarlson $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule;
012:
013: import org.mule.api.ExceptionPayload;
014: import org.mule.api.MuleMessage;
015: import org.mule.api.MuleRuntimeException;
016: import org.mule.api.ThreadSafeAccess;
017: import org.mule.api.transformer.Transformer;
018: import org.mule.api.transformer.TransformerException;
019: import org.mule.api.transport.MessageAdapter;
020: import org.mule.api.transport.MutableMessageAdapter;
021: import org.mule.api.transport.PropertyScope;
022: import org.mule.config.i18n.CoreMessages;
023: import org.mule.transport.AbstractMessageAdapter;
024: import org.mule.transport.DefaultMessageAdapter;
025: import org.mule.transport.NullPayload;
026:
027: import java.io.InputStream;
028: import java.lang.reflect.Proxy;
029: import java.util.Iterator;
030: import java.util.List;
031: import java.util.Map;
032: import java.util.Set;
033:
034: import javax.activation.DataHandler;
035:
036: import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
037:
038: import org.apache.commons.logging.Log;
039: import org.apache.commons.logging.LogFactory;
040:
041: /**
042: * <code>DefaultMuleMessage</code> is a wrapper that contains a payload and properties
043: * associated with the payload.
044: */
045:
046: public class DefaultMuleMessage implements MuleMessage,
047: ThreadSafeAccess {
048: /** Serial version */
049: private static final long serialVersionUID = 1541720810851984842L;
050: private static Log logger = LogFactory
051: .getLog(DefaultMuleMessage.class);
052:
053: private MessageAdapter adapter;
054: private MessageAdapter originalAdapter = null;
055: private transient List appliedTransformerHashCodes = new CopyOnWriteArrayList();
056: private byte[] cache;
057:
058: public DefaultMuleMessage(Object message) {
059: this (message, (Map) null);
060: }
061:
062: public DefaultMuleMessage(Object message, Map properties) {
063: if (message instanceof MessageAdapter) {
064: adapter = (MessageAdapter) message;
065: } else {
066: adapter = new DefaultMessageAdapter(message);
067: }
068: addProperties(properties);
069: resetAccessControl();
070: }
071:
072: public DefaultMuleMessage(Object message, MessageAdapter previous) {
073: if (message instanceof MessageAdapter) {
074: adapter = (MessageAdapter) message;
075: ((ThreadSafeAccess) adapter).resetAccessControl();
076: } else {
077: adapter = new DefaultMessageAdapter(message, previous);
078: }
079: if (previous.getExceptionPayload() != null) {
080: setExceptionPayload(previous.getExceptionPayload());
081: }
082: setEncoding(previous.getEncoding());
083: if (previous.getAttachmentNames().size() > 0) {
084: Set attNames = adapter.getAttachmentNames();
085: synchronized (attNames) {
086: for (Iterator iterator = attNames.iterator(); iterator
087: .hasNext();) {
088: String s = (String) iterator.next();
089: try {
090: addAttachment(s, adapter.getAttachment(s));
091: } catch (Exception e) {
092: throw new MuleRuntimeException(CoreMessages
093: .failedToReadAttachment(s), e);
094: }
095: }
096: }
097: }
098: resetAccessControl();
099: }
100:
101: /** {@inheritDoc} */
102: public Object getPayload(Class outputType)
103: throws TransformerException {
104: return getPayload(outputType, getEncoding());
105: }
106:
107: /**
108: * Will attempt to obtain the payload of this message with the desired Class type. This will
109: * try and resolve a trnsformr that can do this transformation. If a transformer cannot be found
110: * an exception is thrown. Any transfromers added to the reqgistry will be checked for compatability
111: *
112: * @param outputType the desired return type
113: * @param encoding the encoding to use if required
114: * @return The converted payload of this message. Note that this method will not alter the payload of this
115: * message *unless* the payload is an inputstream in which case the stream will be read and the payload will become
116: * the fully read stream.
117: * @throws TransformerException if a transformer cannot be found or there is an error during transformation of the
118: * payload
119: */
120: protected Object getPayload(Class outputType, String encoding)
121: throws TransformerException {
122: //Handle null by ignoring the request
123: if (outputType == null) {
124: return getPayload();
125: }
126:
127: Class inputCls = getPayload().getClass();
128:
129: //Special case where proxies are used for testing
130: if (Proxy.isProxyClass(inputCls)) {
131: inputCls = inputCls.getInterfaces()[0];
132: }
133:
134: //If no conversion is necessary, just return the payload as-is
135: if (outputType.isAssignableFrom(inputCls)) {
136: return getPayload();
137: }
138: //Grab a list of transformers that batch out input/output requirements
139: // List transformers = RegistryContext.getRegistry().lookupTransformers(inputCls, outputType);
140:
141: //The transformer to execute on this message
142: Transformer transformer = null;
143: transformer = MuleServer.getMuleContext().getRegistry()
144: .lookupTransformer(inputCls, outputType);
145:
146: //no transformers found
147: if (transformer == null) {
148: throw new TransformerException(CoreMessages
149: .noTransformerFoundForMessage(inputCls, outputType));
150: }
151:
152: // Pass in the adapter itself, so we respect the encoding
153: Object result = transformer.transform(this );
154:
155: //TODO Unless we disallow Object.class as a valid return type we need to do this extra check
156: if (!outputType.isAssignableFrom(result.getClass())) {
157: throw new TransformerException(CoreMessages
158: .transformOnObjectNotOfSpecifiedType(outputType
159: .getName(), result.getClass()));
160: }
161: //If the payload is a stream and we've consumed it, then we should
162: //set the payload on the message
163: //This is the only time this method will alter the payload on the message
164: if (isPayloadConsumed(inputCls)) {
165: setPayload(result);
166: }
167:
168: return result;
169: }
170:
171: /**
172: * Checks if the payload has been consumed for this message. This only applies to Streaming payload types
173: * since once the stream has been read, the payload of the message should be updated to represent the data read
174: * from the stream
175: *
176: * @param inputCls the input type of the message payload
177: * @return true if the payload message type was stream-based, false otherwise
178: */
179: protected boolean isPayloadConsumed(Class inputCls) {
180: return InputStream.class.isAssignableFrom(inputCls);
181: }
182:
183: /** {@inheritDoc} */
184: public MessageAdapter getAdapter() {
185: return adapter;
186: }
187:
188: /** {@inheritDoc} */
189: public Object getOrginalPayload() {
190: return (originalAdapter == null ? adapter.getPayload()
191: : originalAdapter.getPayload());
192: }
193:
194: /** {@inheritDoc} */
195: public MessageAdapter getOriginalAdapter() {
196: return (originalAdapter == null ? adapter : originalAdapter);
197: }
198:
199: /** {@inheritDoc} */
200: public void setProperty(String key, Object value,
201: PropertyScope scope) {
202: adapter.setProperty(key, value, scope);
203: }
204:
205: /** {@inheritDoc} */
206: public Object getProperty(String key) {
207: return adapter.getProperty(key);
208: }
209:
210: /** {@inheritDoc} */
211: public Object removeProperty(String key) {
212: return adapter.removeProperty(key);
213: }
214:
215: /** {@inheritDoc} */
216: public void setProperty(String key, Object value) {
217: adapter.setProperty(key, value);
218: }
219:
220: /** {@inheritDoc} */
221: public final String getPayloadAsString() throws Exception {
222: assertAccess(READ);
223: return getPayloadAsString(getEncoding());
224: }
225:
226: /** {@inheritDoc} */
227: public byte[] getPayloadAsBytes() throws Exception {
228: assertAccess(READ);
229: if (cache != null) {
230: return cache;
231: }
232: byte[] result = (byte[]) getPayload(byte[].class);
233: if (MuleServer.getMuleContext().getConfiguration()
234: .isCacheMessageAsBytes()) {
235: cache = result;
236: }
237: return result;
238: }
239:
240: /** {@inheritDoc} */
241: public String getPayloadAsString(String encoding) throws Exception {
242: assertAccess(READ);
243: if (cache != null) {
244: return new String(cache, encoding);
245: }
246: String result = (String) getPayload(String.class);
247: if (MuleServer.getMuleContext().getConfiguration()
248: .isCacheMessageAsBytes()) {
249: cache = result.getBytes(encoding);
250: }
251: return result;
252: }
253:
254: /** {@inheritDoc} */
255: public Set getPropertyNames() {
256: return adapter.getPropertyNames();
257: }
258:
259: //** {@inheritDoc} */
260: public double getDoubleProperty(String name, double defaultValue) {
261: return adapter.getDoubleProperty(name, defaultValue);
262: }
263:
264: /** {@inheritDoc} */
265: public void setDoubleProperty(String name, double value) {
266: adapter.setDoubleProperty(name, value);
267: }
268:
269: /** {@inheritDoc} */
270: public String getUniqueId() {
271: return adapter.getUniqueId();
272: }
273:
274: /** {@inheritDoc} */
275: public Object getProperty(String name, Object defaultValue) {
276: return adapter.getProperty(name, defaultValue);
277: }
278:
279: /** {@inheritDoc} */
280: public int getIntProperty(String name, int defaultValue) {
281: return adapter.getIntProperty(name, defaultValue);
282: }
283:
284: /** {@inheritDoc} */
285: public long getLongProperty(String name, long defaultValue) {
286: return adapter.getLongProperty(name, defaultValue);
287: }
288:
289: /** {@inheritDoc} */
290: public boolean getBooleanProperty(String name, boolean defaultValue) {
291: return adapter.getBooleanProperty(name, defaultValue);
292: }
293:
294: /** {@inheritDoc} */
295: public void setBooleanProperty(String name, boolean value) {
296: adapter.setBooleanProperty(name, value);
297: }
298:
299: /** {@inheritDoc} */
300: public void setIntProperty(String name, int value) {
301: adapter.setIntProperty(name, value);
302: }
303:
304: /** {@inheritDoc} */
305: public void setLongProperty(String name, long value) {
306: adapter.setLongProperty(name, value);
307: }
308:
309: /** {@inheritDoc} */
310: public void setCorrelationId(String id) {
311: adapter.setCorrelationId(id);
312: }
313:
314: /** {@inheritDoc} */
315: public String getCorrelationId() {
316: return adapter.getCorrelationId();
317: }
318:
319: /** {@inheritDoc} */
320: public void setReplyTo(Object replyTo) {
321: adapter.setReplyTo(replyTo);
322: }
323:
324: /** {@inheritDoc} */
325: public Object getReplyTo() {
326: return adapter.getReplyTo();
327: }
328:
329: /** {@inheritDoc} */
330: public int getCorrelationSequence() {
331: return adapter.getCorrelationSequence();
332: }
333:
334: /** {@inheritDoc} */
335: public void setCorrelationSequence(int sequence) {
336: adapter.setCorrelationSequence(sequence);
337: }
338:
339: /** {@inheritDoc} */
340: public int getCorrelationGroupSize() {
341: return adapter.getCorrelationGroupSize();
342: }
343:
344: //** {@inheritDoc} */
345: public void setCorrelationGroupSize(int size) {
346: adapter.setCorrelationGroupSize(size);
347: }
348:
349: /** {@inheritDoc} */
350: public ExceptionPayload getExceptionPayload() {
351: return adapter.getExceptionPayload();
352: }
353:
354: /** {@inheritDoc} */
355: public void setExceptionPayload(ExceptionPayload exceptionPayload) {
356: adapter.setExceptionPayload(exceptionPayload);
357: }
358:
359: /** {@inheritDoc} */
360: public String toString() {
361: return adapter.toString();
362: }
363:
364: /** {@inheritDoc} */
365: public void addAttachment(String name, DataHandler dataHandler)
366: throws Exception {
367: adapter.addAttachment(name, dataHandler);
368: }
369:
370: /** {@inheritDoc} */
371: public void removeAttachment(String name) throws Exception {
372: adapter.removeAttachment(name);
373: }
374:
375: /** {@inheritDoc} */
376: public DataHandler getAttachment(String name) {
377: return adapter.getAttachment(name);
378: }
379:
380: /** {@inheritDoc} */
381: public Set getAttachmentNames() {
382: return adapter.getAttachmentNames();
383: }
384:
385: /** {@inheritDoc} */
386: public String getEncoding() {
387: return adapter.getEncoding();
388: }
389:
390: /** {@inheritDoc} */
391: public void setEncoding(String encoding) {
392: adapter.setEncoding(encoding);
393: }
394:
395: /** {@inheritDoc} */
396: public String getStringProperty(String name, String defaultValue) {
397: return adapter.getStringProperty(name, defaultValue);
398: }
399:
400: /** {@inheritDoc} */
401: public void setStringProperty(String name, String value) {
402: adapter.setStringProperty(name, value);
403: }
404:
405: /** {@inheritDoc} */
406: public void addProperties(Map properties) {
407: adapter.addProperties(properties);
408: }
409:
410: /** {@inheritDoc} */
411: public void clearProperties() {
412: adapter.clearProperties();
413: }
414:
415: /** {@inheritDoc} */
416: public Object getPayload() {
417: return adapter.getPayload();
418: }
419:
420: /** {@inheritDoc} */
421: public synchronized void setPayload(Object payload) {
422: //TODO we may want to enforce stricter rules here, rather than silently wrapping the existing adapter
423: if (!(adapter instanceof MutableMessageAdapter)) {
424: adapter = new DefaultMessageAdapter(payload, adapter);
425: } else {
426: ((MutableMessageAdapter) adapter).setPayload(payload);
427: }
428: cache = null;
429: }
430:
431: /** {@inheritDoc} */
432: public void release() {
433: adapter.release();
434: if (originalAdapter != null) {
435: originalAdapter.release();
436: }
437: cache = null;
438: appliedTransformerHashCodes.clear();
439: }
440:
441: /** {@inheritDoc} */
442: public void applyTransformers(List transformers)
443: throws TransformerException {
444: applyTransformers(transformers, null);
445: }
446:
447: public void applyTransformers(List transformers, Class outputType)
448: throws TransformerException {
449: if (transformers.size() > 0
450: && !appliedTransformerHashCodes.contains(new Integer(
451: transformers.hashCode()))) {
452: applyAllTransformers(transformers);
453: appliedTransformerHashCodes.add(new Integer(transformers
454: .hashCode()));
455: }
456:
457: if (null != outputType
458: && !getPayload().getClass()
459: .isAssignableFrom(outputType)) {
460: setPayload(getPayload(outputType));
461: }
462: }
463:
464: protected void applyAllTransformers(List transformers)
465: throws TransformerException {
466: if (transformers.size() > 0) {
467:
468: Iterator iterator = transformers.iterator();
469: while (iterator.hasNext()) {
470: Transformer transformer = (Transformer) iterator.next();
471:
472: if (getPayload() == null) {
473: if (transformer.isAcceptNull()) {
474: setPayload(NullPayload.getInstance());
475: } else {
476: if (logger.isDebugEnabled()) {
477: logger
478: .debug("Transformer "
479: + transformer
480: + " doesn't support the null payload, exiting from transformer chain.");
481: }
482: break;
483: }
484: }
485:
486: Class srcCls = getPayload().getClass();
487: if (transformer.isSourceTypeSupported(srcCls)) {
488: Object result = transformer.transform(this );
489:
490: if (originalAdapter == null
491: && MuleServer.getMuleContext()
492: .getConfiguration()
493: .isCacheMessageOriginalPayload()) {
494: originalAdapter = adapter;
495: }
496: //TODO RM*: Must make sure this works for all scenarios
497: if (result instanceof MuleMessage) {
498: synchronized (this ) {
499: adapter = ((MuleMessage) result)
500: .getAdapter();
501: }
502: } else {
503: setPayload(result);
504: }
505: } else {
506: if (logger.isDebugEnabled()) {
507: logger
508: .debug("Transformer "
509: + transformer
510: + " doesn't support the source payload: "
511: + srcCls);
512: }
513: if (!transformer.isIgnoreBadInput()) {
514: if (logger.isDebugEnabled()) {
515: logger
516: .debug("Exiting from transformer chain (ignoreBadInput = false)");
517: }
518: break;
519: }
520: }
521: }
522: }
523: }
524:
525: //////////////////////////////// ThreadSafeAccess Impl ///////////////////////////////
526:
527: /** {@inheritDoc} */
528: public ThreadSafeAccess newThreadCopy() {
529: if (adapter instanceof ThreadSafeAccess) {
530: logger.debug("new copy of message for "
531: + Thread.currentThread());
532: return new DefaultMuleMessage(((ThreadSafeAccess) adapter)
533: .newThreadCopy(), this );
534: } else {
535: // not much we can do here - streaming will have to handle things itself
536: return this ;
537: }
538: }
539:
540: /** {@inheritDoc} */
541: public void resetAccessControl() {
542: if (adapter instanceof AbstractMessageAdapter) {
543: ((AbstractMessageAdapter) adapter).resetAccessControl();
544: }
545: }
546:
547: /** {@inheritDoc} */
548: public void assertAccess(boolean write) {
549: if (adapter instanceof AbstractMessageAdapter) {
550: ((AbstractMessageAdapter) adapter).assertAccess(write);
551: }
552: }
553: }
|