001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)SimpleServicelist.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.engine.sequencing.servicelist;
030:
031: import com.sun.jbi.engine.sequencing.MessageRegistry;
032: import com.sun.jbi.engine.sequencing.SequencingEngineContext;
033: import com.sun.jbi.engine.sequencing.SequencingEngineResources;
034: import com.sun.jbi.engine.sequencing.framework.Servicelist;
035: import com.sun.jbi.engine.sequencing.util.ConfigData;
036: import com.sun.jbi.engine.sequencing.util.MessageExchangeHelper;
037: import com.sun.jbi.engine.sequencing.util.SequencingEngineUtil;
038: import com.sun.jbi.engine.sequencing.util.StringTranslator;
039:
040: import java.util.Timer;
041: import java.util.TimerTask;
042: import java.util.logging.Logger;
043:
044: import javax.jbi.messaging.DeliveryChannel;
045: import javax.jbi.messaging.ExchangeStatus;
046: import javax.jbi.messaging.Fault;
047: import javax.jbi.messaging.MessageExchange;
048: import javax.jbi.messaging.MessagingException;
049: import javax.jbi.messaging.NormalizedMessage;
050: import javax.jbi.servicedesc.ServiceEndpoint;
051:
052: import javax.transaction.Transaction;
053:
054: import javax.xml.namespace.QName;
055:
056: /**
057: * Class Servicelist. This class has the logic for exetiing the service list
058: * This is a simple implementation which executes the services in a sequence.
059: *
060: * @author Sun Microsystems, Inc.
061: */
062: public class SimpleServicelist implements Servicelist,
063: SequencingEngineResources {
064: /**
065: * Engine Channel.
066: */
067: private DeliveryChannel mChannel;
068:
069: /**
070: * Object to store the last occurred error.
071: */
072: private Exception mLastError;
073:
074: /**
075: * Object to store the last fault message.
076: */
077: private Fault mLastFaultMessage;
078:
079: /**
080: * Logger object.
081: */
082: private Logger mLog;
083:
084: /**
085: * Object to keep track of current exchange.
086: */
087: private MessageExchange mCurrentExchange;
088:
089: /**
090: * Keeps track of the inbound exchange.
091: */
092: private MessageExchange mInboundExchange;
093:
094: /**
095: * Message registry object.
096: */
097: private MessageRegistry mMessageRegistry;
098:
099: /**
100: * Last input message.
101: */
102: private NormalizedMessage mLastInputMessage;
103:
104: /**
105: * Final output message.
106: */
107: private NormalizedMessage mLastOutputMessage;
108:
109: /**
110: * Current service being executed.
111: */
112: private ServiceBean mCurrentService;
113:
114: /**
115: * Service list information.
116: */
117: private ServicelistBean mListBean;
118:
119: /**
120: * Sequence id for every list.
121: */
122: private String mSequenceId;
123:
124: /**
125: * Translator object for internationalization.
126: */
127: private StringTranslator mTranslator;
128:
129: /**
130: * Timer object for tracking time-out.
131: */
132: private Timer mTimer;
133:
134: /**
135: * Current transaction.
136: */
137: private Transaction mCurrentTransaction;
138:
139: /**
140: * True if a response is expected false otherwise
141: */
142: private boolean mResponseExpected;
143:
144: /**
145: * Indicates if an exchange has timed out.
146: */
147: private boolean mTimedOut = false;
148:
149: /**
150: * The service index number.
151: */
152: private int mServiceIndex = 0;
153:
154: /**
155: * State.
156: */
157: private int mState;
158:
159: /**
160: *
161: */
162:
163: /**
164: *
165: */
166: private int mType;
167:
168: /**
169: * Creates a new Servicelist object.
170: */
171: public SimpleServicelist() {
172: mTimer = new Timer();
173: mLog = SequencingEngineContext.getInstance().getLogger();
174: mTranslator = new StringTranslator();
175: mMessageRegistry = MessageRegistry.getInstance();
176: mChannel = SequencingEngineContext.getInstance().getChannel();
177: mSequenceId = SequencingEngineUtil.getTrackingId();
178: setState(ServicelistState.READY);
179: }
180:
181: /**
182: * Returns the current exchange object that is being processed.
183: *
184: * @return message exchange
185: */
186: public MessageExchange getCurrentExchange() {
187: return mCurrentExchange;
188: }
189:
190: /**
191: * Returns the current service object, thats being executed.
192: *
193: * @return service bean object.
194: */
195: public ServiceBean getCurrentService() {
196: return mCurrentService;
197: }
198:
199: /**
200: * Returns the deployment ID associated with this servicelist.
201: *
202: * @return deployment id
203: */
204: public String getDeploymentId() {
205: if (mListBean != null) {
206: return mListBean.getDeploymentId();
207: } else {
208: return null;
209: }
210: }
211:
212: /**
213: * Setter for property mInboundExchange.
214: *
215: * @param mInboundExchange New value of property mInboundExchange.
216: */
217: public void setInboundExchange(
218: javax.jbi.messaging.MessageExchange mInboundExchange) {
219: this .mInboundExchange = mInboundExchange;
220: }
221:
222: /**
223: * Getter for property mInboundExchange.
224: *
225: * @return Value of property mInboundExchange.
226: */
227: public javax.jbi.messaging.MessageExchange getInboundExchange() {
228: return mInboundExchange;
229: }
230:
231: /**
232: * returnd the list name that is being executed in this object.
233: *
234: * @return string list name
235: */
236: public String getListName() {
237: if (mListBean != null) {
238: return mListBean.getServicename();
239: } else {
240: return null;
241: }
242: }
243:
244: /**
245: * sets the message exchange to be processed in this list.
246: *
247: * @param msgex msg exchange
248: */
249: public void setMessageExchange(MessageExchange msgex) {
250: mCurrentExchange = msgex;
251: }
252:
253: /**
254: * sets the servicelist bean.
255: *
256: * @param listbean service list bean.
257: */
258: public void setServicelistBean(ServicelistBean listbean) {
259: mListBean = listbean;
260: }
261:
262: /**
263: * Returns the servicelist bean.
264: *
265: * @return list bean
266: */
267: public ServicelistBean getServicelistBean() {
268: return mListBean;
269: }
270:
271: /**
272: * Sets state.
273: *
274: * @param state state.
275: */
276: public void setState(int state) {
277: mState = state;
278: }
279:
280: /**
281: * Returns the current state od the service list.
282: *
283: * @return state
284: */
285: public int getState() {
286: return 0;
287: }
288:
289: /**
290: * Sets the transaction context.
291: *
292: * @param tran transaction.
293: */
294: public void setTransactionContext(Transaction tran) {
295: mCurrentTransaction = tran;
296: }
297:
298: /**
299: * sets the type.
300: *
301: * @param type request or response
302: */
303: public void setType(int type) {
304: mType = type;
305: }
306:
307: /**
308: * Cancels the message exchange.
309: */
310: public void cancelExchange() {
311: mLog.info(mTranslator.getString(SEQ_SERVICE_TIMED_OUT,
312: mCurrentExchange.getExchangeId(), mCurrentExchange
313: .getEndpoint().getServiceName()));
314: mMessageRegistry.deregisterExchange(mCurrentExchange
315: .getExchangeId()
316: + mSequenceId);
317: mMessageRegistry.registerTimedOutExchange(mCurrentExchange
318: .getExchangeId()
319: + mSequenceId);
320:
321: mLastError = new Exception(mTranslator.getString(
322: SEQ_SERVICE_TIMED_OUT,
323: mCurrentExchange.getExchangeId(), mCurrentExchange
324: .getEndpoint().getServiceName()));
325: mTimer.cancel();
326: sendListError();
327: mListBean.updateState(mSequenceId, mState);
328: }
329:
330: /**
331: * This is the command pattern implementation method which is invoked when
332: * this object is run in a thread pool.
333: */
334: public void execute() {
335: /* This method will be executed in a separate thread.
336: * Every new request / response for the service list will
337: * result in invocation of this method.
338: */
339: mLog.fine(this + "Executing message "
340: + mCurrentExchange.getExchangeId());
341:
342: if (mChannel == null) {
343: mLog.severe(mTranslator.getString(SEQ_EXCHANGE_NOT_SET));
344:
345: return;
346: }
347:
348: if (mCurrentExchange == null) {
349: mLog.severe(mTranslator.getString(SEQ_CHANNEL_NOT_SET));
350:
351: return;
352: }
353:
354: if (mListBean == null) {
355: mLog.severe(mTranslator.getString(SEQ_BEAN_NOT_SET));
356:
357: return;
358: }
359:
360: if (hasTimedOut()) {
361: mLog.info(mTranslator.getString(SEQ_SERVICE_TIMED_OUT,
362: mCurrentExchange.getExchangeId(), mCurrentExchange
363: .getEndpoint().getServiceName()));
364: setState(ServicelistState.TIMED_OUT);
365: try {
366: if (mCurrentExchange.getStatus().equals(
367: ExchangeStatus.ACTIVE)) {
368: mCurrentExchange.setStatus(ExchangeStatus.DONE);
369: }
370: mChannel.send(mCurrentExchange);
371: } catch (Exception tee) {
372: mLog
373: .severe("Unable to respond back to timed out exchange");
374: //dont do anythign, we tried our best to end this
375: // in a sane way.
376: }
377:
378: mMessageRegistry
379: .deregisterTimedOutExchange(mCurrentExchange
380: .getExchangeId()
381: + mSequenceId);
382: /**
383: *
384: * The person who sent this message might ne waiting for
385: * a response back , so send it
386: */
387: return;
388: }
389:
390: /* this is the possible states and the MEPs that can get here
391: * and their interpretation
392: */
393: /*
394: * ---------------------------------------------------------------
395: * MEP - > In-Out In-Only Robust-In-Only
396: * State
397: * ---------------------------------------------------------------
398: *
399: * ACTIVE New Request / New Request New Request
400: * Response
401: *
402: * ERROR Response Failure Status Reponse Failed
403: * completed (stage 2)/ response failed Completed (stg 2)/
404: * Status failed Fault status failed
405: * (stage 3 of MEP) (stg 3)
406: *
407: * DONE Status success Status success Status success completed
408: * completed
409: */
410: setState(ServicelistState.RUNNING);
411: resetTimer();
412:
413: if (mType == ConfigData.REQUEST_TYPE) {
414: processRequest();
415: } else if (mType == ConfigData.RESPONSE_TYPE) {
416: processResponse();
417: } else {
418: mLog.severe(mTranslator.getString(SEQ_UNSUPPORTED_MEP));
419: setState(ServicelistState.ERROR);
420: }
421:
422: mListBean.updateState(mSequenceId, mState);
423: mLog.info(this + mTranslator.getString(SEQ_FINE_EXIT_EXECUTE));
424: }
425:
426: /**
427: * Executes the service with given index in the list.
428: *
429: * @param index service index
430: */
431: public void executeService(int index) {
432: mLog.info(this
433: + mTranslator.getString(SEQ_FINE_EXECUTING_SERVICE)
434: + index);
435:
436: if (index > (mListBean.getServiceCount() - 1)) {
437: mLog.info(mTranslator
438: .getString(SEQ_FINE_SERVICE_EXECUTION_DONE));
439: sendListResponse();
440:
441: return;
442: }
443:
444: try {
445: mCurrentService = mListBean.getService(index);
446: } catch (Exception e) {
447: mLog.severe(mTranslator
448: .getString(SEQ_SEVERE_CANNOT_GET_SERVICE_INFO));
449: e.printStackTrace();
450: mLastError = new Exception(mTranslator
451: .getString(SEQ_SEVERE_CANNOT_GET_SERVICE_INFO));
452: sendListError();
453:
454: return;
455: }
456:
457: MessageExchange req = MessageExchangeHelper
458: .createExchange(mCurrentService.getMep());
459:
460: try {
461: if (!MessageExchangeHelper.updateInMessage(req,
462: mLastInputMessage)) {
463: mLog.severe(mTranslator.getString(SEQ_UPDATE_NM_FAILED,
464: mCurrentService.getName()));
465: mLastError = new Exception(mTranslator
466: .getString(SEQ_UPDATE_NM_FAILED,
467: mCurrentService.getName()));
468: sendListError();
469:
470: return;
471: }
472:
473: req.setOperation(new QName(mCurrentService
474: .getOperationNamespace(), mCurrentService
475: .getOperation()));
476: req.setService(new QName(mCurrentService.getNamespace(),
477: mCurrentService.getName()));
478:
479: /**
480: * The following is the logic an intelligent component adopts to
481: * pick an endpoint. First use the service name to identify the
482: * endpoint list. Then try to see if any endpoint name matches
483: * the one configured. IF not lucky then try the same process with
484: * the interface name. Still not lucky, then our message does not
485: * have an address, so cannot send it
486: */
487: ServiceEndpoint ref = getServiceEndpoint(mCurrentService
488: .getNamespace(), mCurrentService.getName(),
489: mCurrentService.getEndpointName());
490:
491: if (ref == null) {
492: ref = getInterfaceEndpoint(mCurrentService
493: .getInterfaceNamespace(), mCurrentService
494: .getInterfaceName(), mCurrentService
495: .getEndpointName());
496: }
497:
498: /*
499: * if ref is null, it means that either the service is
500: * not activated
501: */
502: if (ref == null) {
503: mLog.info(mTranslator.getString(
504: SEQ_CANNOT_GET_ENDPOINT, mCurrentService
505: .getNamespace()
506: + mCurrentService.getName(),
507: mCurrentService.getEndpointName()));
508: mLastError = new Exception(mTranslator.getString(
509: SEQ_CANNOT_GET_ENDPOINT, mCurrentService
510: .getName()));
511: sendListError();
512:
513: return;
514: }
515:
516: req.setEndpoint(ref);
517:
518: if (!send(req, mCurrentService.getTimeout())) {
519: mLog.severe(mTranslator.getString(SEQ_SEND_FAILED));
520: sendListError();
521: }
522: } catch (Exception e) {
523: e.printStackTrace();
524:
525: /*mLog.severe(mTranslator.getString(SEQ_SEND_FAILED) + " "
526: + mCurrentService.getServiceReference().getServiceName()
527: .toString());
528: */
529: mLastError = e;
530:
531: sendListError();
532: }
533: }
534:
535: /**
536: * returns the value of mTimedOut.
537: *
538: * @return true if the service has timed out
539: */
540: public boolean hasTimedOut() {
541: return mTimedOut;
542: }
543:
544: /**
545: * Processes the response.
546: */
547: public void processResponse() {
548: mLog
549: .info(this
550: + mTranslator
551: .getString(SEQ_PROCESSING_RESPONSE));
552:
553: if (mCurrentExchange.getStatus() == ExchangeStatus.DONE) {
554: /* terminal condition for service in-out and in-only
555: */
556: mLog.info(mCurrentExchange.getEndpoint().getServiceName()
557: + " DONE ");
558: mServiceIndex++;
559: executeService(mServiceIndex);
560:
561: return;
562: }
563:
564: if (mCurrentExchange.getStatus() == ExchangeStatus.ERROR) {
565: mLog.info(mCurrentExchange.getEndpoint().getServiceName()
566: + " ERROR ");
567: mLastError = mCurrentExchange.getError();
568: mLastFaultMessage = mCurrentExchange.getFault();
569: sendListError();
570:
571: return;
572: }
573:
574: mLastOutputMessage = MessageExchangeHelper
575: .getOutMessage(mCurrentExchange);
576: mLastFaultMessage = mCurrentExchange.getFault();
577:
578: if (mLastOutputMessage == null) {
579: if (mLastFaultMessage != null) {
580: sendListResponse();
581: } else {
582: sendListError();
583: }
584: } else {
585: mLastInputMessage = mLastOutputMessage;
586: }
587:
588: try {
589: mCurrentExchange.setStatus(ExchangeStatus.DONE);
590: } catch (Exception e) {
591: e.printStackTrace();
592: }
593:
594: if (send(mCurrentExchange, mCurrentService.getTimeout())) {
595: mServiceIndex++;
596: executeService(mServiceIndex);
597: } else {
598: sendListError();
599: }
600:
601: mLog.fine(this + "Process response completed");
602: }
603:
604: /**
605: *
606: */
607: public void resumeTX() {
608: if (mCurrentTransaction != null) {
609: try {
610: mLog.fine(mTranslator.getString(SEQ_RESUME_TX,
611: mCurrentExchange.getExchangeId()));
612: SequencingEngineContext.getInstance()
613: .getTransactionManager().resume(
614: mCurrentTransaction);
615: } catch (javax.transaction.SystemException se) {
616: mLog.severe(mTranslator.getString(SEQ_RESUME_TX_FAILED,
617: mCurrentExchange.getExchangeId()));
618: se.printStackTrace();
619: } catch (javax.transaction.InvalidTransactionException ite) {
620: mLog.severe(mTranslator.getString(SEQ_RESUME_TX_FAILED,
621: mCurrentExchange.getExchangeId()));
622: ite.printStackTrace();
623: }
624: }
625: }
626:
627: /**
628: * sends an error message for list execution.
629: */
630: public void sendListError() {
631: mLog.severe(this + "Sending list error");
632:
633: if ((mLastError == null) && (mLastFaultMessage == null)) {
634: mLastError = new Exception("Unknown error");
635: }
636:
637: MessageExchangeHelper.updateMessage(mInboundExchange, null,
638: mLastError, mLastFaultMessage);
639: send(mInboundExchange, -1);
640: setState(ServicelistState.ERROR);
641: }
642:
643: /**
644: * sends the response for the list execution.
645: */
646: public void sendListResponse() {
647: mLog.info(this + mTranslator.getString(SEQ_SEND_LIST_RESPONSE));
648: mLastError = null;
649:
650: if ((mLastInputMessage == null) && (mLastFaultMessage == null)) {
651: mLastError = new Exception(mTranslator
652: .getString(SEQ_NULL_OUT_MESSAGE));
653: }
654:
655: MessageExchangeHelper.updateMessage(mInboundExchange,
656: mLastOutputMessage, mLastError, mLastFaultMessage);
657: send(mInboundExchange, -1);
658: setState(ServicelistState.COMPLETED);
659: }
660:
661: /**
662: * starts the timer.
663: *
664: * @param timeout timeout , if -1 then timer is not started
665: */
666: public void startTimer(long timeout) {
667: mTimer.cancel();
668: mTimer = new Timer();
669: mTimedOut = false;
670:
671: if (timeout != 0) {
672: mTimer.schedule(new ReminderTask(), timeout);
673: }
674: }
675:
676: /**
677: * helper to get the interface endpoint for a service.
678: *
679: * @param namespace service namespace
680: * @param name service name
681: * @param epname endpoint name
682: *
683: * @return ServiceEndpoint
684: */
685: private ServiceEndpoint getInterfaceEndpoint(String namespace,
686: String name, String epname) {
687: ServiceEndpoint ep = null;
688:
689: try {
690: ServiceEndpoint[] ref = SequencingEngineContext
691: .getInstance().getContext().getEndpoints(
692: new QName(namespace, name));
693:
694: if ((ref == null) || (ref.length == 0)) {
695: return null;
696: } else {
697: ep = ref[0];
698:
699: for (int k = 0; k < ref.length; k++) {
700: if (ref[k].getEndpointName().trim().equals(
701: epname.trim())) {
702: ep = ref[k];
703: }
704: }
705: }
706: } catch (Exception e) {
707: mLog.info(mTranslator.getString(SEQ_CANNOT_GET_ENDPOINT,
708: namespace, name));
709: e.printStackTrace();
710:
711: return null;
712: }
713:
714: return ep;
715: }
716:
717: /**
718: * helper to get the local endpoint for a service.
719: *
720: * @param namespace service namespace
721: * @param name service name
722: * @param epname endpoint name
723: *
724: * @return ServiceEndpoint
725: */
726: private ServiceEndpoint getServiceEndpoint(String namespace,
727: String name, String epname) {
728: ServiceEndpoint ep = null;
729:
730: try {
731: ServiceEndpoint[] ref = SequencingEngineContext
732: .getInstance().getContext().getEndpointsForService(
733: new QName(namespace, name));
734:
735: if ((ref == null) || (ref.length == 0)) {
736: return null;
737: } else {
738: ep = ref[0];
739:
740: for (int k = 0; k < ref.length; k++) {
741: if (ref[k].getEndpointName().trim().equals(
742: epname.trim())) {
743: ep = ref[k];
744: }
745: }
746: }
747: } catch (Exception e) {
748: mLog.info(mTranslator.getString(SEQ_CANNOT_GET_ENDPOINT,
749: namespace, name));
750: e.printStackTrace();
751:
752: return null;
753: }
754:
755: return ep;
756: }
757:
758: /**
759: * Processed any request exchanges.
760: */
761: private void processRequest() {
762: if (mCurrentExchange.getStatus() == ExchangeStatus.DONE) {
763: mLog.info(this .toString()
764: + mTranslator.getString(SEQ_LIST_COMPLETED,
765: this .mListBean.getServicename()));
766: mState = ServicelistState.COMPLETED;
767: return;
768: }
769:
770: if (mCurrentExchange.getStatus() == ExchangeStatus.ERROR) {
771: mLog.severe(mTranslator.getString(SEQ_LIST_COMPLETED_ERROR,
772: this .mListBean.getServicename()));
773: mState = ServicelistState.ERROR;
774: return;
775: }
776:
777: mInboundExchange = mCurrentExchange;
778: mLastInputMessage = MessageExchangeHelper
779: .getInMessage(mCurrentExchange);
780:
781: if (mLastInputMessage == null) {
782: mLog.severe(mTranslator.getString(SEQ_NO_INPUT_MESSAGE,
783: this .mListBean.getServicename()));
784: MessageExchangeHelper.updateMessage(mCurrentExchange, null,
785: new Exception(mTranslator.getString(
786: SEQ_NO_INPUT_MESSAGE, this .mListBean
787: .getServicename())), null);
788: send(mCurrentExchange, -1);
789: mState = ServicelistState.ERROR;
790:
791: return;
792: }
793:
794: executeService(mServiceIndex);
795: }
796:
797: /**
798: * Registers the message exchange.
799: *
800: * @param msg ME
801: */
802: private void registerExchange(MessageExchange msg) {
803: String reqid = (String) msg
804: .getProperty(ConfigData.REQUEST_SEQ_ID);
805: String respid = (String) msg
806: .getProperty(ConfigData.RESPONSE_SEQ_ID);
807:
808: if (respid != null) {
809: mMessageRegistry.registerExchange(msg.getExchangeId()
810: + respid, this );
811: } else if (reqid != null) {
812: mMessageRegistry.registerExchange(msg.getExchangeId()
813: + reqid, this );
814: } else {
815: mLog.severe("Registration info missing");
816: }
817: }
818:
819: /**
820: * DeRegisters the message exchange.
821: *
822: * @param msg ME
823: */
824: private void deregisterExchange(MessageExchange msg) {
825: String reqid = (String) msg
826: .getProperty(ConfigData.REQUEST_SEQ_ID);
827: String respid = (String) msg
828: .getProperty(ConfigData.RESPONSE_SEQ_ID);
829:
830: if (respid != null) {
831: mMessageRegistry.deregisterExchange(msg.getExchangeId()
832: + respid);
833: } else if (reqid != null) {
834: mMessageRegistry.deregisterExchange(msg.getExchangeId()
835: + reqid);
836: } else {
837: mLog.severe("Registration info missing");
838: }
839: }
840:
841: /**
842: * Resets the timer.
843: */
844: private void resetTimer() {
845: if (mTimer != null) {
846: mTimer.cancel();
847: }
848: }
849:
850: /**
851: * Method that sends any message exchnage on the channel.
852: *
853: * @param msg exchange to be sent
854: * @param timeout timeout for that exchange
855: *
856: * @return true if success else false
857: */
858: private boolean send(MessageExchange msg, long timeout) {
859: /*
860: * Send message mSendMessage
861: * register in message registry
862: * start timer -1 indicates no timer
863: */
864: if (msg == null) {
865: mLog.severe(SEQ_NULL_EXCHANGE);
866:
867: return false;
868: }
869:
870: try {
871: mLog.info(this .toString()
872: + mTranslator.getString(SEQ_FINE_SEND_MESSAGE, msg
873: .getEndpoint().getServiceName()));
874:
875: if ((msg.getStatus() == ExchangeStatus.DONE)
876: || (msg.getStatus() == ExchangeStatus.ERROR)) {
877: mLog.info(this .toString()
878: + mTranslator.getString(SEQ_FINE_SEND_ACTIVE,
879: msg.getStatus().toString()));
880: mResponseExpected = false;
881: } else {
882: registerExchange(msg);
883: setState(ServicelistState.WAITING);
884: mResponseExpected = true;
885: }
886:
887: mCurrentExchange = msg;
888: msg.setProperty(
889: MessageExchange.JTA_TRANSACTION_PROPERTY_NAME,
890: mCurrentTransaction);
891: resumeTX();
892: mChannel.send(msg);
893: // mMessageRegistry.dump();
894: } catch (MessagingException me) {
895: mLog.severe(mTranslator.getString(SEQ_SEND_FAILED));
896: mLog.severe(me.getMessage());
897: mLastError = me;
898: me.printStackTrace();
899: mResponseExpected = false;
900: setState(ServicelistState.ERROR);
901: // if you cannot send the message for some reason
902: // deregister it
903: deregisterExchange(msg);
904: return false;
905: }
906:
907: if ((mResponseExpected) && (timeout != -1)) {
908: startTimer(timeout);
909: }
910:
911: return true;
912: }
913:
914: /**
915: * This is a reminder task for the timer.
916: *
917: * @author Sun Microsystems, Inc.
918: */
919: public class ReminderTask extends TimerTask {
920: /**
921: * run method for the thread.
922: */
923: public void run() {
924: /* call some method
925: * that will deregister the message from
926: * message registry and return an error response
927: */
928: mTimedOut = true;
929: cancelExchange();
930: }
931: }
932: }
|