001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MessageExchangeProxy.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.messaging;
030:
031: import com.sun.jbi.messaging.stats.METimestamps;
032:
033: import com.sun.jbi.messaging.util.Translator;
034:
035: import java.net.URI;
036:
037: import java.util.HashMap;
038: import java.util.Set;
039:
040: import java.util.logging.Logger;
041:
042: import javax.jbi.messaging.ExchangeStatus;
043: import javax.jbi.messaging.Fault;
044: import javax.jbi.messaging.MessagingException;
045: import javax.jbi.messaging.NormalizedMessage;
046:
047: import javax.jbi.servicedesc.ServiceEndpoint;
048:
049: import javax.xml.namespace.QName;
050:
051: /**
052: * This abstract class is used as the base for a MessageExchange reference
053: * given to a Binding or Engine. The binding and engine are given different
054: * instances, because the states naviagated during execution are different.
055: * Each instance also tracks its twin.
056: *
057: * It's mostly just a proxy to the underlying MessageExchangeImpl. The proxy
058: * tracks the status of the reference to determine which operations are legal
059: * at any point in time. Legal operations are forwarded to the underlying
060: * MessageExchangeImpl.
061: *
062: * Each subclass is expected to specify a state machine that will be used
063: * by this proxy to sequence the actions. The state machine is very simple.
064: * There are only 4 actions: SEND, ACCEPT, SET_STATUS, SET_FAULT.
065: * Along with each state there is a set (implemented as a bit-mask) of
066: * operations that are legal in the current state. The structure is
067: * represented as a simple [][]. The first dimension is the array of
068: * states, the second dimension contains the valid set, followed by the
069: * next array index for each action. An index of -1 means an illegal
070: * request (the code in here should only request legal transisions.)
071: *
072: * Each subclass implements 2 state machines, one for the SOURCE side and
073: * one for the TARGET side of a message exchange.
074: *
075: * @author Sun Microsystems, Inc.
076: */
077: public abstract class MessageExchangeProxy implements
078: com.sun.jbi.messaging.MessageExchange {
079: /*********************************************
080: * State machine operations *
081: ********************************************/
082: /** Allowed to set transaction. */
083: public static final int SET_TRANSACTION = 0x0000001;
084: /** Category of actions used to address a message exchange:
085: * SET_SERVICE, SET_OPERATION, SET_ENDPOINT, SET_INTERFACE
086: */
087: public static final int ADDRESS = 0x0000002;
088: /** Allowed to set property. */
089: public static final int SET_PROPERTY = 0x0000010;
090: /** Allowed to set fault. */
091: public static final int SET_FAULT = 0x0000020;
092: /** Allowed to set done status. */
093: public static final int SET_DONE = 0x0000040;
094: /** Allowed to set error status. */
095: public static final int SET_ERROR = 0x0000080;
096: /** Allowed to set IN message */
097: public static final int SET_IN = 0x0001000;
098: /** Allowed to set OUT message */
099: public static final int SET_OUT = 0x0002000;
100: /** Allowed to create fault. */
101: public static final int CREATE_FAULT = 0x0004000;
102: /** Allowed to SENDSYNCH. */
103: public static final int DO_SENDSYNCH = 0x0008000;
104: /** Allowed to SEND */
105: public static final int DO_SEND = 0x0010000;
106: /** Allowed to ACCEPT */
107: public static final int DO_ACCEPT = 0x0020000;
108: /** Hint SUSPEND_TX */
109: public static final int SUSPEND_TX = 0x0100000;
110: /** Hint RESUME_TX */
111: public static final int RESUME_TX = 0x0200000;
112: /** Hint DONE */
113: public static final int MARK_DONE = 0x0400000;
114: /** Hint ACTIVE */
115: public static final int MARK_ACTIVE = 0x0800000;
116: /** Hint COMPLETE */
117: public static final int COMPLETE = 0x1000000;
118: /** Hint check STATUS or FAULT */
119: public static final int CHECK_STATUS_OR_FAULT = 0x2000000;
120: /** Hint is a REQUEST. */
121: public static final int REQUEST = 0x4000000;
122: /** Hint is a STATUS. */
123: public static final int STATUS = 0x8000000;
124:
125: /*********************************************
126: * State machine actions *
127: ********************************************/
128: /** Column 1: Legal operation MASK. */
129: public static final int ACTION_MASK = 0;
130: /** Column 2: Action SEND */
131: public static final int ACTION_SEND = 1;
132: /** Column 3: Action ACCEPT */
133: public static final int ACTION_ACCEPT = 2;
134: /** Column 4: Action STATUS */
135: public static final int ACTION_STATUS = 3;
136: /** Column 5: Action FAULT */
137: public static final int ACTION_FAULT = 4;
138:
139: /*********************************************
140: * Message names *
141: ********************************************/
142: /** Name of IN message. */
143: static final String IN_MSG = new String("in");
144: /** Name of OUT message. */
145: static final String OUT_MSG = new String("out");
146:
147: private Logger mLog = Logger.getLogger(this .getClass().getPackage()
148: .getName());
149:
150: /**
151: * The underlying MessageExchangeImpl that we are proxying
152: */
153: private MessageExchangeImpl mMEI;
154:
155: /**
156: * The state machine used to control this exchange.
157: */
158: private int[][] mState;
159:
160: /**
161: * The current index into the state machine.
162: */
163: private int mStateIndex;
164:
165: /**
166: * Our twin MessageExchangeProxy.
167: */
168: private MessageExchangeProxy mTwin;
169:
170: /**
171: * Delivery channel for our twins side.
172: */
173: private DeliveryChannelImpl mSendChannel;
174:
175: /**
176: * Linked endpoint enabled through a service connection. This endpoint
177: * is only visible to the service consumer; the provider sees the
178: * actual endpoint that was activated.
179: */
180: private ServiceEndpoint mEndpointLink;
181:
182: private boolean mStatisticsEnabled;
183:
184: /**
185: * Synchronous support.
186: */
187: private int mSynchState;
188: static final int NONE = 0;
189: static final int WAIT = 1;
190: static final int WAIT_TIMEOUT = 2;
191: static final int HALF_DONE = 3;
192: static final int DONE = 4;
193: static final int ERROR = 5;
194:
195: /**
196: * Messaging phase (used for statistics routing.)
197: */
198: private int mPhase;
199: static final int PHASE_NONE = 0;
200: static final int PHASE_DONE = 1;
201: static final int PHASE_ERROR = 3;
202: static final int PHASE_REQUEST = 4;
203: static final int PHASE_REPLY = 5;
204: static final int PHASE_FAULT = 6;
205:
206: private int mPhaseMask;
207: static final int PM_SEND_DONE = 0x0001;
208: static final int PM_SEND_ERROR = 0x0002;
209: static final int PM_SEND_REQUEST = 0x0004;
210: static final int PM_SEND_REPLY = 0x0008;
211: static final int PM_SEND_FAULT = 0x0010;
212: static final int PM_RECEIVE_DONE = 0x0100;
213: static final int PM_RECEIVE_ERROR = 0x0200;
214: static final int PM_RECEIVE_REQUEST = 0x0400;
215: static final int PM_RECEIVE_REPLY = 0x0800;
216: static final int PM_RECEIVE_FAULT = 0x1000;
217:
218: /**
219: * Constructor.
220: */
221: MessageExchangeProxy(int[][] states) {
222: mState = states;
223: mStateIndex = 0;
224: mSynchState = NONE;
225: }
226:
227: /**
228: * Create a twin of ourselves in the target role.
229: */
230: abstract MessageExchangeProxy newTwin();
231:
232: /**
233: * Get ExchangeId associated with this MessageExchange.
234: */
235: public String getExchangeId() {
236: return (mMEI.getExchangeId());
237: }
238:
239: /**
240: * Get status of this MessageExchange.
241: */
242: public ExchangeStatus getStatus() {
243: return (mMEI.getStatus());
244: }
245:
246: /**
247: * Set status of this MessageExchange.
248: */
249: public void setStatus(ExchangeStatus status)
250: throws javax.jbi.messaging.MessagingException {
251: boolean isDone = status.equals(ExchangeStatus.DONE);
252: boolean isError = status.equals(ExchangeStatus.ERROR);
253:
254: if ((isDone && can(SET_DONE)) || (isError && can(SET_ERROR))) {
255: if (isDone) {
256: if (can(SET_OUT) && getMessage(OUT_MSG) != null) {
257: throw new javax.jbi.messaging.MessagingException(
258: Translator.translate(
259: LocalStringKeys.STATUS_ON_MSG, this
260: .getPattern().toString()));
261: }
262: if (mMEI.getStatus().equals(ExchangeStatus.ERROR)) {
263: throw new javax.jbi.messaging.MessagingException(
264: Translator.translate(
265: LocalStringKeys.ERROR_STATUS, this
266: .getPattern().toString()));
267: }
268: } else {
269: mMEI.setMessage(null, OUT_MSG);
270: }
271: mMEI.setStatus(status);
272: mPhase = isDone ? PHASE_DONE : PHASE_ERROR;
273: mPhaseMask |= (isDone ? PM_SEND_DONE : PM_SEND_ERROR);
274: nextState(ACTION_STATUS);
275: return;
276: }
277:
278: throw new javax.jbi.messaging.MessagingException(Translator
279: .translate(LocalStringKeys.PATTERN_INCONSISTENT, this
280: .getPattern().toString(), this == mMEI
281: .getSource() ? Translator
282: .translate(LocalStringKeys.SOURCE) : Translator
283: .translate(LocalStringKeys.TARGET), Integer
284: .valueOf(mStateIndex)));
285: }
286:
287: /**
288: * Get Exception describing the exchanges error status.
289: */
290: public Exception getError() {
291: return (mMEI.getError());
292: }
293:
294: /**
295: * Set Exception describing reason for error status.
296: */
297: public void setError(Exception error) {
298: mMEI.setMessage(null, OUT_MSG);
299: mMEI.setStatus(ExchangeStatus.ERROR);
300: mMEI.setError(error);
301: mPhase = PHASE_ERROR;
302: mPhaseMask |= PM_SEND_ERROR;
303: nextState(ACTION_STATUS);
304: }
305:
306: /**
307: * Get the Fault message for this exchange.
308: */
309: public Fault getFault() {
310: return (mMEI.getFault());
311: }
312:
313: /**
314: * Set Fault message for this exchange.
315: */
316: public void setFault(Fault fault) throws MessagingException {
317: if (!can(SET_FAULT)) {
318: throw new javax.jbi.messaging.MessagingException(Translator
319: .translate(LocalStringKeys.FAULT_NOT_SUPPORTED,
320: this .getPattern().toString()));
321: }
322: if ((can(SET_IN) && getMessage(IN_MSG) != null)
323: || (can(SET_OUT) && getMessage(OUT_MSG) != null)) {
324: throw new javax.jbi.messaging.MessagingException(Translator
325: .translate(LocalStringKeys.FAULT_ON_MSG, this
326: .getPattern().toString()));
327: }
328:
329: mMEI.setFault(fault);
330: mPhase = PHASE_FAULT;
331: mPhaseMask |= PM_SEND_FAULT;
332: nextState(ACTION_FAULT);
333: }
334:
335: /**
336: * Get the Endpoint for this exchange. If a service connection was used
337: * to address this exchange, that endpoint is returned to the consumer. The
338: * provider always sees the 'real' endpoint that it activated. If a service
339: * connection was not used, the provider and consumer see the same endpoint.
340: */
341: public ServiceEndpoint getEndpoint() {
342: if (mEndpointLink != null
343: && getRole().equals(MessageExchange.Role.CONSUMER)) {
344: return mEndpointLink;
345: } else {
346: return mMEI.getEndpoint();
347: }
348: }
349:
350: /**
351: * Set Endpoint for this exchange.
352: */
353: public void setEndpoint(ServiceEndpoint endPoint) {
354: if (can(ADDRESS)) {
355: mMEI.setEndpoint(endPoint);
356: }
357: }
358:
359: /**
360: * Get the Service for this exchange.
361: */
362: public QName getService() {
363: return (mMEI.getService());
364: }
365:
366: /**
367: * Set Service for this exchange.
368: */
369: public void setService(QName service) {
370: if (can(ADDRESS)) {
371: mMEI.setService(service);
372: }
373: }
374:
375: /**
376: * Get the Service for this exchange.
377: */
378: public QName getOperation() {
379: return (mMEI.getOperation());
380: }
381:
382: /**
383: * Set Operation for this exchange.
384: */
385: public void setOperation(QName operation) {
386: if (can(ADDRESS)) {
387: mMEI.setOperation(operation);
388: }
389: }
390:
391: public QName getInterfaceName() {
392: return mMEI.getInterfaceName();
393: }
394:
395: public void setInterfaceName(QName interfaceName) {
396: if (can(ADDRESS)) {
397: mMEI.setInterfaceName(interfaceName);
398: }
399: }
400:
401: /**
402: * Get a NormalizedMEssage by reference.
403: */
404: public NormalizedMessage getMessage(String name) {
405: return (mMEI.getMessage(name));
406: }
407:
408: /**
409: * Set a NormalizedMessage by reference.
410: */
411: public void setMessage(NormalizedMessage message, String name)
412: throws MessagingException {
413: if ((can(SET_IN) && name.equals(IN_MSG))
414: || (can(SET_OUT) && name.equals(OUT_MSG))) {
415: mMEI.setMessage(message, name);
416: if (name.equals(IN_MSG)) {
417: mPhase = PHASE_REQUEST;
418: mPhaseMask |= PM_SEND_REQUEST;
419: } else {
420: mPhase = PHASE_REPLY;
421: mPhaseMask |= PM_SEND_REPLY;
422: }
423: } else {
424: throw new javax.jbi.messaging.MessagingException(
425: Translator
426: .translate(
427: LocalStringKeys.PATTERN_INCONSISTENT,
428: this .getPattern().toString(),
429: this == mMEI.getSource() ? Translator
430: .translate(LocalStringKeys.SOURCE)
431: : Translator
432: .translate(LocalStringKeys.TARGET),
433: Integer.valueOf(mStateIndex)));
434:
435: }
436: }
437:
438: /**
439: * Get a property.
440: */
441: public Object getProperty(String name) {
442: return (mMEI.getProperty(name));
443: }
444:
445: /**
446: * Set a property.
447: */
448: public void setProperty(String name, Object object) {
449: if (can(SET_PROPERTY)) {
450: if (name.equals(JTA_TRANSACTION_PROPERTY_NAME)) {
451: if (can(SET_TRANSACTION)) {
452: mMEI.setProperty(name, object);
453: }
454: } else {
455: mMEI.setProperty(name, object);
456: }
457: }
458: }
459:
460: public java.util.Set getPropertyNames() {
461: return mMEI.getPropertyNames();
462: }
463:
464: /**
465: * Return the role to take in this exchange.
466: */
467: public Role getRole() {
468: return ((mState[0][ACTION_MASK] & DO_ACCEPT) == 0 ? Role.CONSUMER
469: : Role.PROVIDER);
470: }
471:
472: public String getSourceComponent() {
473: DeliveryChannelImpl dc = mSendChannel;
474:
475: if ((mState[0][ACTION_MASK] & DO_ACCEPT) != 0) {
476: dc = mTwin.mSendChannel;
477: }
478:
479: return (dc.getChannelId());
480: }
481:
482: public String getTargetComponent() {
483: DeliveryChannelImpl dc = mSendChannel;
484:
485: if ((mState[0][ACTION_MASK] & DO_ACCEPT) == 0) {
486: dc = mTwin.mSendChannel;
487: }
488:
489: return (dc.getChannelId());
490: }
491:
492: /**
493: * Check existence of a transaction.
494: */
495: public boolean isTransacted() {
496: return (mMEI.isTransacted());
497: }
498:
499: /**
500: * Create a Fault.
501: */
502: public Fault createFault() throws MessagingException {
503: if (!can(CREATE_FAULT)) {
504: throw new javax.jbi.messaging.MessagingException(Translator
505: .translate(LocalStringKeys.FAULT_NOT_SUPPORTED));
506: }
507: return (new FaultImpl());
508: }
509:
510: /**
511: * Create a Message.
512: */
513: public NormalizedMessage createMessage() throws MessagingException {
514: return (new MessageImpl());
515: }
516:
517: /**
518: * Set the identity of our twin.
519: */
520: void setTwin(MessageExchangeProxy mep) {
521: mTwin = mep;
522: }
523:
524: /**
525: * Get the identity of our twin.
526: */
527: MessageExchangeProxy getTwin() {
528: return (mTwin);
529: }
530:
531: void setSynchState(int state) {
532: mSynchState = state;
533: }
534:
535: int getSynchState() {
536: return (mSynchState);
537: }
538:
539: public int getPhase() {
540: int phase = mPhase;
541:
542: mPhase = PHASE_NONE;
543: return (phase);
544: }
545:
546: public int getPhaseMask() {
547: return (mPhaseMask);
548: }
549:
550: /**
551: * Set our MessageExchange.
552: */
553: void setMessageExchange(MessageExchangeImpl me, boolean statsEnabled) {
554: mMEI = me;
555: mStatisticsEnabled = statsEnabled;
556: }
557:
558: /**
559: * Get our MessageExchange.
560: */
561: MessageExchangeImpl getMessageExchange() {
562: return (mMEI);
563: }
564:
565: boolean isSource() {
566: return (this == mMEI.getSource());
567: }
568:
569: /**
570: * Validate that we can send.
571: */
572: void validate(DeliveryChannelImpl channel, boolean isSynch)
573: throws javax.jbi.messaging.MessagingException {
574: String message = null;
575:
576: //
577: // Perform any first time checks.
578: //
579: if (mSendChannel == null) {
580: ServiceEndpoint endpoint = getEndpoint();
581:
582: if (endpoint == null) {
583: message = Translator
584: .translate(LocalStringKeys.ADDR_NO_ENDPOINT);
585: } else if (!((RegisteredEndpoint) endpoint).isActive()) {
586: message = Translator.translate(
587: LocalStringKeys.INACTIVE_ENDPOINT, endpoint);
588: }
589: }
590:
591: //
592: // Checks made every time.
593: //
594: if (message == null) {
595: if (!can(DO_SEND)) {
596: message = Translator
597: .translate(
598: LocalStringKeys.SEND_NOT_LEGAL,
599: this .getPattern().toString(),
600: this == mMEI.getSource() ? Translator
601: .translate(LocalStringKeys.SOURCE)
602: : Translator
603: .translate(LocalStringKeys.TARGET),
604: Integer.valueOf(mStateIndex));
605:
606: } else if (isSynch && !can(DO_SENDSYNCH)) {
607: message = Translator
608: .translate(
609: LocalStringKeys.SENDSYNCH_NOT_LEGAL,
610: this .getPattern().toString(),
611: this == mMEI.getSource() ? Translator
612: .translate(LocalStringKeys.SOURCE)
613: : Translator
614: .translate(LocalStringKeys.TARGET),
615: Integer.valueOf(mStateIndex));
616: }
617: }
618:
619: //
620: // Throw if we end up here with error message.
621: //
622: if (message != null) {
623: throw new MessagingException(message);
624: }
625:
626: if (mSendChannel == null) {
627: setupChannels(channel);
628: }
629: }
630:
631: /**
632: * Setup channels between twins.
633: */
634: private void setupChannels(DeliveryChannelImpl dc)
635: throws javax.jbi.messaging.MessagingException {
636: String target;
637: DeliveryChannelImpl targetDc;
638: RegisteredEndpoint endpoint;
639: boolean invertedTwin = false;
640:
641: endpoint = (RegisteredEndpoint) mMEI.getEndpoint();
642: target = endpoint.getOwnerId();
643:
644: targetDc = dc.getChannel(target);
645: if (targetDc == null) {
646: throw new javax.jbi.messaging.MessagingException(Translator
647: .translate(LocalStringKeys.INVALID_DESTINATION));
648: }
649:
650: bindChannels(dc, targetDc);
651: }
652:
653: /**
654: * Set the sendChannel.
655: */
656: DeliveryChannelImpl getSendChannel() {
657: return (mSendChannel);
658: }
659:
660: /**
661: * Bind source and target channels and create target MessageExchange.
662: */
663: void bindChannels(DeliveryChannelImpl source,
664: DeliveryChannelImpl target) {
665: MessageExchangeProxy twin;
666:
667: twin = newTwin();
668: twin.setMessageExchange(mMEI, mStatisticsEnabled);
669: twin.mTwin = this ;
670: mTwin = twin;
671: mTwin.mSendChannel = source;
672: mSendChannel = target;
673: }
674:
675: /**
676: * Test if an Operation is legal in the current state.
677: * @param operation the operation to test
678: */
679: private boolean can(int operation) {
680: return ((mStateIndex >= 0) && ((mState[mStateIndex][ACTION_MASK] & operation) != 0));
681: }
682:
683: /**
684: * Force exchange into DONE state. Typically used after a sendSynch timeout
685: * or the closing of the source or target channel.
686: */
687: public synchronized boolean terminate() {
688: boolean terminated;
689:
690: if (can(SET_ERROR)) {
691: mStateIndex = mState[mStateIndex][ACTION_STATUS];
692: mMEI.setStatus(ExchangeStatus.ERROR);
693: } else if (can(DO_ACCEPT)) {
694: mMEI.setStatus(ExchangeStatus.ERROR);
695: }
696: terminated = !can(DO_SEND);
697: while (!can(MARK_DONE)) {
698: mStateIndex++;
699: }
700: mSynchState = ERROR;
701: return (terminated);
702: }
703:
704: /**
705: * Change to next state.
706: */
707: private void nextState(int state) {
708: int newStateIndex = mState[mStateIndex][state];
709:
710: if (newStateIndex > 0) {
711: mStateIndex = newStateIndex;
712: } else {
713: throw new IllegalStateException(Translator.translate(
714: LocalStringKeys.ILLEGAL_STATE_CHANGE, this
715: .getPattern().toString(), this == mMEI
716: .getSource() ? Translator
717: .translate(LocalStringKeys.SOURCE)
718: : Translator
719: .translate(LocalStringKeys.TARGET),
720: Integer.valueOf(mStateIndex)));
721: }
722: }
723:
724: /**
725: * Handle any special processing after an accept().
726: * @return boolean that signals if processing is complete.
727: */
728: boolean handleAccept(DeliveryChannelImpl channel)
729: throws javax.jbi.messaging.MessagingException {
730: boolean done = false;
731:
732: mSynchState = MessageExchangeProxy.NONE;
733: if (channel.isTransactional() && can(RESUME_TX)) {
734: mMEI.resumeTX();
735: }
736: if (this .can(CHECK_STATUS_OR_FAULT)) {
737: if (this .getFault() != null) {
738: mPhase = PHASE_FAULT;
739: mPhaseMask |= PM_RECEIVE_FAULT;
740: this .nextState(ACTION_FAULT);
741: } else if (!this .getStatus().equals(ExchangeStatus.ACTIVE)) {
742: if (this .getStatus().equals(ExchangeStatus.DONE)) {
743: mPhase = PHASE_DONE;
744: mPhaseMask |= PM_RECEIVE_DONE;
745: } else {
746: mPhase = PHASE_ERROR;
747: mPhaseMask |= PM_RECEIVE_ERROR;
748: }
749: this .nextState(ACTION_STATUS);
750: } else {
751: mPhase = PHASE_REPLY;
752: mPhaseMask |= PM_RECEIVE_REPLY;
753: this .nextState(ACTION_ACCEPT);
754: }
755: } else {
756: if (can(REQUEST)) {
757: mPhase = PHASE_REQUEST;
758: mPhaseMask |= PM_RECEIVE_REQUEST;
759: } else if (can(STATUS)) {
760: if (this .getStatus().equals(ExchangeStatus.DONE)) {
761: mPhase = PHASE_DONE;
762: mPhaseMask |= PM_RECEIVE_DONE;
763: } else {
764: mPhase = PHASE_ERROR;
765: mPhaseMask |= PM_RECEIVE_ERROR;
766: }
767: }
768: this .nextState(ACTION_ACCEPT);
769: }
770:
771: if (can(MARK_DONE)) {
772: done = true;
773: if (can(COMPLETE)) {
774: if ((mState[0][ACTION_MASK] & DO_ACCEPT) == 0) {
775: mPhaseMask = mTwin.mPhaseMask;
776: }
777: }
778: }
779: return (done);
780: }
781:
782: void setEndpointLink(ServiceEndpoint se) {
783: mEndpointLink = se;
784: }
785:
786: ServiceEndpoint getEndpointLink() {
787: return mEndpointLink;
788: }
789:
790: /**
791: * Handle any special processing after an send().
792: * @return boolean that signals if processing is complete.
793: */
794: boolean handleSend(DeliveryChannelImpl channel)
795: throws javax.jbi.messaging.MessagingException {
796: mSynchState = MessageExchangeProxy.NONE;
797: if (channel.isTransactional() && can(SUSPEND_TX)) {
798: mMEI.suspendTX();
799: }
800:
801: this .nextState(ACTION_SEND);
802: if (can(MARK_ACTIVE)) {
803: mMEI.setStatus(ExchangeStatus.ACTIVE);
804: }
805: return (can(MARK_DONE));
806: }
807:
808: /**
809: * Handle any special processing after an sendSync().
810: */
811: void handleSendSync(DeliveryChannelImpl channel)
812: throws javax.jbi.messaging.MessagingException {
813: mSynchState = MessageExchangeProxy.NONE;
814: if (channel.isTransactional() && can(SUSPEND_TX)) {
815: mMEI.suspendTX();
816: }
817:
818: this .nextState(ACTION_SEND);
819: if (can(MARK_ACTIVE)) {
820: mMEI.setStatus(ExchangeStatus.ACTIVE);
821: }
822: }
823:
824: void beforeCapabilityCheck(ServiceEndpoint se) {
825: mStateIndex = -1;
826: mMEI.setEndpoint(se);
827: }
828:
829: void afterCapabilityCheck() {
830: mStateIndex = 0;
831: mMEI.setEndpoint(null);
832: }
833:
834: public boolean isRemoteInvocation() {
835: return (mMEI.isRemote());
836: }
837:
838: public boolean checkTimeout() {
839: boolean timedout = false;
840:
841: synchronized (this ) {
842: if (mSynchState == MessageExchangeProxy.NONE) {
843: mSynchState = MessageExchangeProxy.ERROR;
844: timedout = true;
845: }
846: }
847: return (timedout);
848: }
849:
850: public Set getDeltaProperties() {
851: return (mMEI.getDeltaProperties());
852: }
853:
854: public void mergeProperties() {
855: mMEI.mergeProperties();
856: }
857:
858: void setInUse(String ownerId) {
859: if (getRole().equals(MessageExchange.Role.CONSUMER)) {
860: if (mEndpointLink != null) {
861: ((LinkedEndpoint) mEndpointLink).setInUse(ownerId);
862: }
863: } else {
864: ((RegisteredEndpoint) mMEI.getEndpoint()).setInUse();
865: }
866: }
867:
868: void resetInUse() {
869: if (getRole().equals(MessageExchange.Role.CONSUMER)) {
870: if (mEndpointLink != null) {
871: ((RegisteredEndpoint) mEndpointLink).resetInUse();
872: }
873: } else {
874: ((RegisteredEndpoint) mMEI.getEndpoint()).resetInUse();
875: }
876: }
877:
878: boolean capture(byte consumerTag, byte providerTag) {
879: if (mStatisticsEnabled) {
880: mMEI
881: .capture((mState[0][ACTION_MASK] & DO_ACCEPT) == 0 ? consumerTag
882: : providerTag);
883: }
884: return (can(COMPLETE));
885: }
886:
887: void updateStatistics() {
888: METimestamps ts = mMEI.getTimestamps();
889:
890: if (ts != null) {
891: ts.compute();
892: if ((mState[0][ACTION_MASK] & DO_ACCEPT) == 0) {
893: mSendChannel.updateProviderStatistics(ts);
894: mTwin.mSendChannel.updateConsumerStatistics(ts);
895: } else {
896: mSendChannel.updateConsumerStatistics(ts);
897: mTwin.mSendChannel.updateProviderStatistics(ts);
898: }
899: }
900: mSendChannel = null;
901: mTwin.mSendChannel = null;
902: mTwin.mTwin = null;
903: mTwin = null;
904: }
905:
906: METimestamps getTimestamps() {
907: return (mMEI.getTimestamps());
908: }
909:
910: public String toString() {
911: StringBuilder sb = new StringBuilder();
912:
913: sb.append(" ExchangeId: ");
914: sb.append(mMEI.getExchangeId());
915: sb.append("\n Pattern: ");
916: sb.append(getPattern());
917: sb.append("\n SendChannel: ");
918: sb.append(mSendChannel == null ? "Null" : mSendChannel
919: .getChannelId());
920: sb.append("\n Role: ");
921: sb.append(getRole().equals(Role.CONSUMER) ? "Consumer"
922: : "Provider");
923: sb.append(" Index: ");
924: sb.append(mStateIndex);
925: sb.append(" SynchState: ");
926: sb
927: .append(mSynchState == NONE ? "NONE"
928: : (mSynchState == WAIT ? "WAIT"
929: : (mSynchState == WAIT_TIMEOUT ? "WAIT_TIMEOUT"
930: : (mSynchState == HALF_DONE ? "HALF_DONE"
931: : (mSynchState == DONE ? "DONE"
932: : "ERROR")))));
933: sb.append("\n EndpointLink: ");
934: sb.append(mEndpointLink == null ? "Null" : mEndpointLink
935: .toString());
936: sb.append(mMEI.toString());
937: return (sb.toString());
938: }
939: }
|