001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MessageExchangeImpl.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.messaging;
030:
031: import com.sun.jbi.messaging.stats.METimestamps;
032:
033: import com.sun.jbi.messaging.util.Translator;
034:
035: import java.net.URI;
036: import java.util.HashMap;
037: import java.util.Iterator;
038: import java.util.Map;
039: import java.util.Set;
040: import java.util.logging.Logger;
041:
042: import javax.jbi.messaging.ExchangeStatus;
043: import javax.jbi.messaging.Fault;
044: import javax.jbi.messaging.MessageExchange;
045: import javax.jbi.messaging.MessagingException;
046: import javax.jbi.messaging.NormalizedMessage;
047: import javax.jbi.servicedesc.ServiceEndpoint;
048:
049: import javax.transaction.SystemException;
050: import javax.transaction.Transaction;
051:
052: import javax.xml.namespace.QName;
053:
054: /** This serves as the container about all of the state of a message exchange.
055: * The objects presented to the binding and engine are instances of
056: * MessageExchangeProxy. The MessageExchangeProxy forwards all get/set
057: * requests to here. This class maintains a reference to each.
058: * @author Sun Microsystems, Inc.
059: */
060: public class MessageExchangeImpl implements
061: javax.jbi.messaging.MessageExchange, Cloneable {
062: /** Default factory for normalized messages. */
063: private static MessageFactory sMsgFac;
064:
065: private Logger mLog = Logger.getLogger(this .getClass().getPackage()
066: .getName());
067:
068: /** Service reference for this exchange. */
069: private QName mService;
070: /** Interface name for this exchange. */
071: private QName mInterfaceName;
072: /** Endpoint reference for this exchange. */
073: private ServiceEndpoint mEndpoint;
074: /** Operation name for this exchange. */
075: private QName mOperation;
076: /** WSDL Fault. */
077: private Fault mFault;
078: /** Stores messages for this exchange. */
079: private HashMap mMessages;
080: /** Stores exchange properties. */
081: private HashMap mProperties;
082: private HashMap mDeltaProperties;
083:
084: /** Unique Id for exchange. */
085: private String mExchangeId;
086: /** Status of the exchange. */
087: private ExchangeStatus mStatus;
088: /** Error information. */
089: private Exception mError;
090:
091: /** Transaction for this exchange. Since the javax.transaction is not
092: * part of Java SE, we need to store the Transaction reference as an object.
093: */
094: private Object mTransaction;
095:
096: private boolean mAutoSuspendResume;
097: /** Source MessageExchange. */
098: private MessageExchangeProxy mSource;
099: /** MessageService */
100: private MessageService mMsgSvc;
101: /** Created Remotely */
102: private boolean mRemote;
103: /** MessageExchange statistics. */
104: private METimestamps mStamps;
105:
106: /** Static init of random number generator and message factory. */
107: static {
108: sMsgFac = MessageFactory.getInstance();
109: }
110:
111: /**
112: * Create a new message exchange for the given proxy.
113: * @param proxy MessageExchangeProxy for pattern
114: */
115: MessageExchangeImpl(MessageExchangeProxy proxy,
116: MessageService msgSvc) {
117: boolean statisticsEnabled = msgSvc.statisticsEnabled();
118:
119: mMsgSvc = msgSvc;
120: mSource = proxy;
121: mAutoSuspendResume = false;
122: proxy.setMessageExchange(this , statisticsEnabled);
123: if (statisticsEnabled) {
124: mStamps = new METimestamps();
125: }
126: init();
127: }
128:
129: /**
130: * Create a new message exchange for the given proxy and exchange.
131: * USed by the Observer mechanism to create the observed at that point in time.
132: * @param proxy MessageExchangeProxy for pattern
133: */
134: MessageExchangeImpl(MessageExchangeProxy proxy,
135: MessageExchangeImpl source, MessageService msgSvc) {
136: mMsgSvc = msgSvc;
137: mSource = proxy;
138: mAutoSuspendResume = false;
139: proxy.setMessageExchange(this , false);
140: mExchangeId = source.mExchangeId;
141: mProperties = (HashMap) source.mProperties.clone();
142: mDeltaProperties = (HashMap) source.mDeltaProperties.clone();
143: mStatus = source.mStatus;
144: mFault = source.mFault;
145: mInterfaceName = source.mInterfaceName;
146: mOperation = source.mOperation;
147: mService = source.mService;
148: mEndpoint = source.mEndpoint;
149: mError = source.mError;
150: mMessages = new HashMap();
151: }
152:
153: /** Initialize instance vars and set state to 0. */
154: private void init() {
155: mExchangeId = mMsgSvc.generateNextId();
156: mProperties = new HashMap();
157: mDeltaProperties = new HashMap();
158: mMessages = new HashMap();
159: mStatus = ExchangeStatus.ACTIVE;
160: mRemote = false;
161: }
162:
163: /** Deferred to message factory. */
164: public Fault createFault() throws MessagingException {
165: return sMsgFac.createFault();
166: }
167:
168: /** Deferred to message factory. */
169: public NormalizedMessage createMessage() throws MessagingException {
170: return sMsgFac.createMessage();
171: }
172:
173: /** Obvious. */
174: public ServiceEndpoint getEndpoint() {
175: return mEndpoint;
176: }
177:
178: /** Obvious. */
179: public Exception getError() {
180: return mError;
181: }
182:
183: /** Obvious. */
184: public String getExchangeId() {
185: return mExchangeId;
186: }
187:
188: /** Obvious. */
189: public Fault getFault() {
190: return mFault;
191: }
192:
193: /** Return the message associated with the given reference id. */
194: public NormalizedMessage getMessage(String name) {
195: return (NormalizedMessage) mMessages.get(name);
196: }
197:
198: /** Obvious. */
199: public QName getOperation() {
200: return mOperation;
201: }
202:
203: /** Obvious. */
204: public URI getPattern() {
205: return (mSource == null ? null : mSource.getPattern());
206: }
207:
208: /** Obvious. */
209: public Object getProperty(String name) {
210: Object prop = mDeltaProperties.get(name);
211: if (prop == null) {
212: prop = mProperties.get(name);
213: }
214: return (prop);
215: }
216:
217: /** Obvious. */
218: public Set getPropertyNames() {
219: Set names;
220:
221: (names = mProperties.keySet())
222: .addAll(mDeltaProperties.keySet());
223: return (names);
224: }
225:
226: public MessageExchange.Role getRole() {
227: return (null);
228: }
229:
230: /** Obvious. */
231: public QName getService() {
232: return mService;
233: }
234:
235: /** Obvious. */
236: public ExchangeStatus getStatus() {
237: return mStatus;
238: }
239:
240: /** Special for serialization. */
241: Set getDeltaProperties() {
242: return (mDeltaProperties.entrySet());
243: }
244:
245: /** Used for deserialization support. */
246: void setExchangeId(String id) {
247: mExchangeId = id;
248: mRemote = true;
249: }
250:
251: /** Only allowed on a freshly created exchange. */
252: public void setEndpoint(ServiceEndpoint endpoint) {
253: mEndpoint = endpoint;
254: }
255:
256: /** Sets the exception and automatically advances status to Error. */
257: public void setError(Exception error) {
258: mError = error;
259: }
260:
261: /** Set a fault on the exchange. */
262: public void setFault(Fault fault) {
263: mFault = fault;
264: }
265:
266: /** Set a message on the exchange. */
267: public void setMessage(NormalizedMessage msg, String name) {
268: if (msg == null) {
269: mMessages.remove(name);
270: } else {
271: mMessages.put(name, msg);
272: }
273: }
274:
275: /** Only allowed on a freshly created exchange. */
276: public void setOperation(QName name) {
277: mOperation = name;
278: }
279:
280: /** Obvious. */
281: public void setProperty(String name, Object obj) {
282: if (name.equals(JTA_TRANSACTION_PROPERTY_NAME)) {
283: try {
284: setTransactionContext((Transaction) obj);
285: } catch (javax.jbi.messaging.MessagingException msgEx) {
286: mLog.warning(msgEx.toString());
287: }
288: }
289:
290: mDeltaProperties.put(name, obj);
291: }
292:
293: public void mergeProperties() {
294: mProperties.putAll(mDeltaProperties);
295: mDeltaProperties.clear();
296: }
297:
298: boolean isRemote() {
299: return (mRemote);
300: }
301:
302: /** Only allowed on a freshly created exchange. */
303: public void setService(QName service) {
304: mService = service;
305: }
306:
307: /** Set status of the exchange. ERROR status is allowed at any point. */
308: public void setStatus(ExchangeStatus status) {
309: mStatus = status;
310: }
311:
312: public QName getInterfaceName() {
313: return mInterfaceName;
314: }
315:
316: public void setInterfaceName(QName interfaceName) {
317: mInterfaceName = interfaceName;
318: }
319:
320: /** Only allowed on a freshly created exchange. */
321: void setTransactionContext(Transaction xact)
322: throws javax.jbi.messaging.MessagingException {
323: if (xact == null) {
324: try {
325: xact = mMsgSvc.getTransactionManager().getTransaction();
326: }
327: //catch (javax.transaction.SystemException se)
328: catch (Exception ex) {
329: throw new javax.jbi.messaging.MessagingException(
330: Translator
331: .translate(LocalStringKeys.CANT_GET_DEFAULT_TRANSACTION));
332: }
333: }
334: mTransaction = xact;
335:
336: }
337:
338: /** Only allowed on a freshly created exchange. */
339: public Transaction getTransactionContext() {
340: return (Transaction) mTransaction;
341: }
342:
343: /** Only allowed on a freshly created exchange. */
344: public boolean isTransacted() {
345: return (mTransaction != null);
346: }
347:
348: MessageExchangeProxy getSource() {
349: return (mSource);
350: }
351:
352: /** Suspend the current transaction if defined. */
353: void suspendTX() throws javax.jbi.messaging.MessagingException {
354: if (mTransaction != null) {
355: try {
356: if (mAutoSuspendResume) {
357: mLog.fine(Translator.translate(
358: LocalStringKeys.EXCHANGE_SUSPEND_TX,
359: new Object[] { mExchangeId }));
360:
361: if (mTransaction == mMsgSvc.getTransactionManager()
362: .getTransaction()) {
363: mMsgSvc.getTransactionManager().suspend();
364: }
365: } else {
366: if (mMsgSvc.getTransactionManager()
367: .getTransaction() == mTransaction) {
368: throw new javax.jbi.messaging.MessagingException(
369: Translator
370: .translate(LocalStringKeys.MUST_SUSPEND));
371: }
372: }
373: }
374: //catch (javax.transaction.SystemException se)
375: catch (Exception ex) {
376: throw new javax.jbi.messaging.MessagingException(
377: Translator
378: .translate(LocalStringKeys.CANT_SUSPEND));
379: }
380: }
381: }
382:
383: /** Resume the current transaction if defined. */
384: void resumeTX() throws javax.jbi.messaging.MessagingException {
385: if (mTransaction != null && mAutoSuspendResume) {
386: try {
387: mLog.fine(Translator.translate(
388: LocalStringKeys.EXCHANGE_RESUME_TX,
389: new Object[] { mExchangeId }));
390: mMsgSvc.getTransactionManager().resume(
391: (Transaction) mTransaction);
392: } catch (javax.transaction.InvalidTransactionException ite) {
393: throw new javax.jbi.messaging.MessagingException(
394: Translator
395: .translate(LocalStringKeys.CANT_RESUME_INVALID));
396: }
397: //catch (javax.transaction.SystemException se)
398: catch (Exception ex) {
399: throw new javax.jbi.messaging.MessagingException(
400: Translator
401: .translate(LocalStringKeys.CANT_RESUME));
402: }
403: }
404: }
405:
406: //
407: // Capture a timestamp associated with the given tag.
408: //
409: void capture(byte tag) {
410: if (mStamps != null) {
411: mStamps.capture(tag);
412: }
413: }
414:
415: METimestamps getTimestamps() {
416: return (mStamps);
417: }
418:
419: public String toString() {
420: StringBuilder sb = new StringBuilder();
421: sb.append("\n Status: ");
422: sb.append(mStatus.equals(ExchangeStatus.ACTIVE) ? "ACTIVE"
423: : (mStatus.equals(ExchangeStatus.DONE) ? "DONE"
424: : "ERROR"));
425: sb.append(" Location: ");
426: sb.append(mRemote ? "REMOTE" : "LOCAL");
427: sb.append("\n Service: ");
428: sb.append(mService);
429: sb.append("\n Endpoint:\n");
430: sb.append(((RegisteredEndpoint) mEndpoint).toStringBrief());
431: sb.append(" Operation: ");
432: sb.append(mOperation == null ? "null" : mOperation);
433: sb.append("\n InterfaceName: ");
434: sb.append(mInterfaceName == null ? "null" : mInterfaceName);
435: sb.append("\n Transaction: ");
436: sb.append(mTransaction == null ? "None" : mTransaction
437: .toString());
438: sb.append("\n Properties Count: ");
439: sb.append(mProperties.size());
440: sb.append("\n");
441: for (Iterator p = mProperties.entrySet().iterator(); p
442: .hasNext();) {
443: Map.Entry me = (Map.Entry) p.next();
444: sb.append(" Name: ");
445: sb.append((String) me.getKey());
446: if (me.getValue() instanceof String) {
447: sb.append("\n Value: ");
448: sb.append((String) me.getValue());
449: } else {
450: sb.append("\n Value(Type): ");
451: sb.append(me.getValue() == null ? "null" : me
452: .getValue().getClass().getName());
453: }
454: sb.append("\n");
455: }
456: sb.append(" DeltaProperties Count: ");
457: sb.append(mDeltaProperties.size());
458: sb.append("\n");
459: for (Iterator p = mDeltaProperties.entrySet().iterator(); p
460: .hasNext();) {
461: Map.Entry me = (Map.Entry) p.next();
462: sb.append(" Name: ");
463: sb.append((String) me.getKey());
464: if (me.getValue() instanceof String) {
465: sb.append("\n Value: ");
466: sb.append((String) me.getValue());
467: } else {
468: sb.append("\n Value(Type): ");
469: sb.append(me.getValue() == null ? "null" : me
470: .getValue().getClass().getName());
471: }
472: sb.append("\n");
473: }
474: sb.append(" Messages Count: ");
475: sb.append(mMessages.size());
476: sb.append("\n");
477: for (Iterator m = mMessages.entrySet().iterator(); m.hasNext();) {
478: Map.Entry me = (Map.Entry) m.next();
479: sb.append(" Message Name: ");
480: sb.append((String) me.getKey());
481: sb.append("\n");
482: sb.append(me.getKey() == null ? "null" : me.getValue()
483: .toString());
484: }
485: if (mFault != null) {
486: sb.append(" Fault: \n");
487: sb.append(mFault.toString());
488: }
489: if (mStamps != null) {
490: sb.append(" " + mStamps.toString());
491: }
492: return (sb.toString());
493: }
494: }
|