001: // Copyright (c) 2004-2005 Sun Microsystems Inc., All Rights Reserved.
002:
003: /*
004: * SimpleServicelist.java
005: *
006: * SUN PROPRIETARY/CONFIDENTIAL
007: * This software is the proprietary information of Sun Microsystems, Inc.
008: * Use is subject to license term
009: */
010: package com.sun.jbi.engine.sequencing.servicelist;
011:
012: import com.sun.jbi.engine.sequencing.MessageRegistry;
013: import com.sun.jbi.engine.sequencing.SequencingEngineContext;
014: import com.sun.jbi.engine.sequencing.SequencingEngineResources;
015: import com.sun.jbi.engine.sequencing.framework.Servicelist;
016: import com.sun.jbi.engine.sequencing.util.ConfigData;
017: import com.sun.jbi.engine.sequencing.util.MessageExchangeHelper;
018: import com.sun.jbi.engine.sequencing.util.SequencingEngineUtil;
019: import com.sun.jbi.engine.sequencing.util.StringTranslator;
020:
021: import java.util.Timer;
022: import java.util.TimerTask;
023: import java.util.logging.Logger;
024:
025: import javax.jbi.messaging.DeliveryChannel;
026: import javax.jbi.messaging.ExchangeStatus;
027: import javax.jbi.messaging.Fault;
028: import javax.jbi.messaging.MessageExchange;
029: import javax.jbi.messaging.MessagingException;
030: import javax.jbi.messaging.NormalizedMessage;
031: import javax.jbi.servicedesc.ServiceEndpoint;
032:
033: import javax.transaction.Transaction;
034:
035: import javax.xml.namespace.QName;
036:
037: /**
038: * Class Servicelist. This class has the logic for exetiing the service list
039: * This is a simple implementation which executes the services in a sequence.
040: *
041: * @author Sun Microsystems, Inc.
042: */
043: public class SimpleServicelist implements Servicelist,
044: SequencingEngineResources {
045: /**
046: * Engine Channel.
047: */
048: private DeliveryChannel mChannel;
049:
050: /**
051: * Object to store the last occurred error.
052: */
053: private Exception mLastError;
054:
055: /**
056: * Object to store the last fault message.
057: */
058: private Fault mLastFaultMessage;
059:
060: /**
061: * Logger object.
062: */
063: private Logger mLog;
064:
065: /**
066: * Object to keep track of current exchange.
067: */
068: private MessageExchange mCurrentExchange;
069:
070: /**
071: * Keeps track of the inbound exchange.
072: */
073: private MessageExchange mInboundExchange;
074:
075: /**
076: * Message registry object.
077: */
078: private MessageRegistry mMessageRegistry;
079:
080: /**
081: * Last input message.
082: */
083: private NormalizedMessage mLastInputMessage;
084:
085: /**
086: * Final output message.
087: */
088: private NormalizedMessage mLastOutputMessage;
089:
090: /**
091: * Current service being executed.
092: */
093: private ServiceBean mCurrentService;
094:
095: /**
096: * Service list information.
097: */
098: private ServicelistBean mListBean;
099:
100: /**
101: * Sequence id for every list.
102: */
103: private String mSequenceId;
104:
105: /**
106: * Translator object for internationalization.
107: */
108: private StringTranslator mTranslator;
109:
110: /**
111: * Timer object for tracking time-out.
112: */
113: private Timer mTimer;
114:
115: /**
116: * Current transaction.
117: */
118: private Transaction mCurrentTransaction;
119:
120: /**
121: * True if a response is expected false otherwise
122: */
123: private boolean mResponseExpected;
124:
125: /**
126: * Indicates if an exchange has timed out.
127: */
128: private boolean mTimedOut = false;
129:
130: /**
131: * The service index number.
132: */
133: private int mServiceIndex = 0;
134:
135: /**
136: * State.
137: */
138: private int mState;
139:
140: /**
141: *
142: */
143:
144: /**
145: *
146: */
147: private int mType;
148:
149: /**
150: * Creates a new Servicelist object.
151: */
152: public SimpleServicelist() {
153: mTimer = new Timer();
154: mLog = SequencingEngineContext.getInstance().getLogger();
155: mTranslator = new StringTranslator();
156: mMessageRegistry = MessageRegistry.getInstance();
157: mChannel = SequencingEngineContext.getInstance().getChannel();
158: mSequenceId = SequencingEngineUtil.getTrackingId();
159: setState(ServicelistState.READY);
160: }
161:
162: /**
163: * Returns the current exchange object that is being processed.
164: *
165: * @return message exchange
166: */
167: public MessageExchange getCurrentExchange() {
168: return mCurrentExchange;
169: }
170:
171: /**
172: * Returns the current service object, thats being executed.
173: *
174: * @return service bean object.
175: */
176: public ServiceBean getCurrentService() {
177: return mCurrentService;
178: }
179:
180: /**
181: * Returns the deployment ID associated with this servicelist.
182: *
183: * @return deployment id
184: */
185: public String getDeploymentId() {
186: if (mListBean != null) {
187: return mListBean.getDeploymentId();
188: } else {
189: return null;
190: }
191: }
192:
193: /**
194: * Setter for property mInboundExchange.
195: *
196: * @param mInboundExchange New value of property mInboundExchange.
197: */
198: public void setInboundExchange(
199: javax.jbi.messaging.MessageExchange mInboundExchange) {
200: this .mInboundExchange = mInboundExchange;
201: }
202:
203: /**
204: * Getter for property mInboundExchange.
205: *
206: * @return Value of property mInboundExchange.
207: */
208: public javax.jbi.messaging.MessageExchange getInboundExchange() {
209: return mInboundExchange;
210: }
211:
212: /**
213: * returnd the list name that is being executed in this object.
214: *
215: * @return string list name
216: */
217: public String getListName() {
218: if (mListBean != null) {
219: return mListBean.getServicename();
220: } else {
221: return null;
222: }
223: }
224:
225: /**
226: * sets the message exchange to be processed in this list.
227: *
228: * @param msgex msg exchange
229: */
230: public void setMessageExchange(MessageExchange msgex) {
231: mCurrentExchange = msgex;
232: }
233:
234: /**
235: * sets the servicelist bean.
236: *
237: * @param listbean service list bean.
238: */
239: public void setServicelistBean(ServicelistBean listbean) {
240: mListBean = listbean;
241: }
242:
243: /**
244: * Returns the servicelist bean.
245: *
246: * @return list bean
247: */
248: public ServicelistBean getServicelistBean() {
249: return mListBean;
250: }
251:
252: /**
253: * Sets state.
254: *
255: * @param state state.
256: */
257: public void setState(int state) {
258: mState = state;
259: }
260:
261: /**
262: * Returns the current state od the service list.
263: *
264: * @return state
265: */
266: public int getState() {
267: return 0;
268: }
269:
270: /**
271: * Sets the transaction context.
272: *
273: * @param tran transaction.
274: */
275: public void setTransactionContext(Transaction tran) {
276: mCurrentTransaction = tran;
277: }
278:
279: /**
280: * sets the type.
281: *
282: * @param type request or response
283: */
284: public void setType(int type) {
285: mType = type;
286: }
287:
288: /**
289: * Cancels the message exchange.
290: */
291: public void cancelExchange() {
292: mLog.info(mTranslator.getString(SEQ_SERVICE_TIMED_OUT,
293: mCurrentExchange.getExchangeId(), mCurrentExchange
294: .getEndpoint().getServiceName()));
295: mMessageRegistry.deregisterExchange(mCurrentExchange
296: .getExchangeId()
297: + mSequenceId);
298: mMessageRegistry.registerTimedOutExchange(mCurrentExchange
299: .getExchangeId()
300: + mSequenceId);
301:
302: mLastError = new Exception(mTranslator.getString(
303: SEQ_SERVICE_TIMED_OUT,
304: mCurrentExchange.getExchangeId(), mCurrentExchange
305: .getEndpoint().getServiceName()));
306: mTimer.cancel();
307: sendListError();
308: mListBean.updateState(mSequenceId, mState);
309: }
310:
311: /**
312: * This is the command pattern implementation method which is invoked when
313: * this object is run in a thread pool.
314: */
315: public void execute() {
316: /* This method will be executed in a separate thread.
317: * Every new request / response for the service list will
318: * result in invocation of this method.
319: */
320: mLog.fine(this + "Executing message "
321: + mCurrentExchange.getExchangeId());
322:
323: if (mChannel == null) {
324: mLog.severe(mTranslator.getString(SEQ_EXCHANGE_NOT_SET));
325:
326: return;
327: }
328:
329: if (mCurrentExchange == null) {
330: mLog.severe(mTranslator.getString(SEQ_CHANNEL_NOT_SET));
331:
332: return;
333: }
334:
335: if (mListBean == null) {
336: mLog.severe(mTranslator.getString(SEQ_BEAN_NOT_SET));
337:
338: return;
339: }
340:
341: if (hasTimedOut()) {
342: mLog.info(mTranslator.getString(SEQ_SERVICE_TIMED_OUT,
343: mCurrentExchange.getExchangeId(), mCurrentExchange
344: .getEndpoint().getServiceName()));
345: setState(ServicelistState.TIMED_OUT);
346:
347: return;
348: }
349:
350: /* this is the possible states and the MEPs that can get here
351: * and their interpretation
352: */
353: /*
354: * ---------------------------------------------------------------
355: * MEP - > In-Out In-Only Robust-In-Only
356: * State
357: * ---------------------------------------------------------------
358: *
359: * ACTIVE New Request / New Request New Request
360: * Response
361: *
362: * ERROR Response Failure Status Reponse Failed
363: * completed (stage 2)/ response failed Completed (stg 2)/
364: * Status failed Fault status failed
365: * (stage 3 of MEP) (stg 3)
366: *
367: * DONE Status success Status success Status success completed
368: * completed
369: */
370: setState(ServicelistState.RUNNING);
371: resetTimer();
372:
373: if (mType == ConfigData.REQUEST_TYPE) {
374: processRequest();
375: } else if (mType == ConfigData.RESPONSE_TYPE) {
376: processResponse();
377: } else {
378: mLog.severe(mTranslator.getString(SEQ_UNSUPPORTED_MEP));
379: setState(ServicelistState.ERROR);
380: }
381:
382: mListBean.updateState(mSequenceId, mState);
383: mLog.info(this + mTranslator.getString(SEQ_FINE_EXIT_EXECUTE));
384: }
385:
386: /**
387: * Executes the service with given index in the list.
388: *
389: * @param index service index
390: */
391: public void executeService(int index) {
392: mLog.info(this
393: + mTranslator.getString(SEQ_FINE_EXECUTING_SERVICE)
394: + index);
395:
396: if (index > (mListBean.getServiceCount() - 1)) {
397: mLog.info(mTranslator
398: .getString(SEQ_FINE_SERVICE_EXECUTION_DONE));
399: sendListResponse();
400:
401: return;
402: }
403:
404: try {
405: mCurrentService = mListBean.getService(index);
406: } catch (Exception e) {
407: mLog.severe(mTranslator
408: .getString(SEQ_SEVERE_CANNOT_GET_SERVICE_INFO));
409: e.printStackTrace();
410: mLastError = new Exception(mTranslator
411: .getString(SEQ_SEVERE_CANNOT_GET_SERVICE_INFO));
412: sendListError();
413:
414: return;
415: }
416:
417: MessageExchange req = MessageExchangeHelper
418: .createExchange(mCurrentService.getMep());
419:
420: try {
421: if (!MessageExchangeHelper.updateInMessage(req,
422: mLastInputMessage)) {
423: mLog.severe(mTranslator.getString(SEQ_UPDATE_NM_FAILED,
424: mCurrentService.getName()));
425: mLastError = new Exception(mTranslator
426: .getString(SEQ_UPDATE_NM_FAILED,
427: mCurrentService.getName()));
428: sendListError();
429:
430: return;
431: }
432:
433: req.setOperation(new QName(mCurrentService
434: .getOperationNamespace(), mCurrentService
435: .getOperation()));
436: req.setService(new QName(mCurrentService.getNamespace(),
437: mCurrentService.getName()));
438:
439: /**
440: * The following is the logic an intelligent component adopts to
441: * pick an endpoint. First use the service name to identify the
442: * endpoint list. Then try to see if any endpoint name matches
443: * the one configured. IF not lucky then try the same process with
444: * the interface name. Still not lucky, then our message does not
445: * have an address, so cannot send it
446: */
447: ServiceEndpoint ref = getServiceEndpoint(mCurrentService
448: .getNamespace(), mCurrentService.getName(),
449: mCurrentService.getEndpointName());
450:
451: if (ref == null) {
452: ref = getInterfaceEndpoint(mCurrentService
453: .getInterfaceNamespace(), mCurrentService
454: .getInterfaceName(), mCurrentService
455: .getEndpointName());
456: }
457:
458: /*
459: * if ref is null, it means that either the service is
460: * not activated
461: */
462: if (ref == null) {
463: mLog.info(mTranslator.getString(
464: SEQ_CANNOT_GET_ENDPOINT, mCurrentService
465: .getNamespace()
466: + mCurrentService.getName(),
467: mCurrentService.getEndpointName()));
468: mLastError = new Exception(mTranslator.getString(
469: SEQ_CANNOT_GET_ENDPOINT, mCurrentService
470: .getName()));
471: sendListError();
472:
473: return;
474: }
475:
476: req.setEndpoint(ref);
477:
478: if (!send(req, mCurrentService.getTimeout())) {
479: mLog.severe(mTranslator.getString(SEQ_SEND_FAILED));
480: sendListError();
481: }
482: } catch (Exception e) {
483: e.printStackTrace();
484: mLog.severe(mTranslator.getString(SEQ_SEND_FAILED)
485: + " "
486: + mCurrentService.getServiceReference()
487: .getServiceName().toString());
488: mLastError = e;
489:
490: sendListError();
491: }
492: }
493:
494: /**
495: * returns the value of mTimedOut.
496: *
497: * @return true if the service has timed out
498: */
499: public boolean hasTimedOut() {
500: return mTimedOut;
501: }
502:
503: /**
504: * Processes the response.
505: */
506: public void processResponse() {
507: mLog
508: .info(this
509: + mTranslator
510: .getString(SEQ_PROCESSING_RESPONSE));
511:
512: if (mCurrentExchange.getStatus() == ExchangeStatus.DONE) {
513: /* terminal condition for service in-out and in-only
514: */
515: mLog.info(mCurrentExchange.getEndpoint().getServiceName()
516: + " DONE ");
517: mServiceIndex++;
518: executeService(mServiceIndex);
519:
520: return;
521: }
522:
523: if (mCurrentExchange.getStatus() == ExchangeStatus.ERROR) {
524: mLog.info(mCurrentExchange.getEndpoint().getServiceName()
525: + " ERROR ");
526: mLastError = mCurrentExchange.getError();
527: mLastFaultMessage = mCurrentExchange.getFault();
528: sendListError();
529:
530: return;
531: }
532:
533: mLastOutputMessage = MessageExchangeHelper
534: .getOutMessage(mCurrentExchange);
535: mLastFaultMessage = mCurrentExchange.getFault();
536:
537: if (mLastOutputMessage == null) {
538: if (mLastFaultMessage != null) {
539: sendListResponse();
540: } else {
541: sendListError();
542: }
543: } else {
544: mLastInputMessage = mLastOutputMessage;
545: }
546:
547: try {
548: mCurrentExchange.setStatus(ExchangeStatus.DONE);
549: } catch (Exception e) {
550: e.printStackTrace();
551: }
552:
553: if (send(mCurrentExchange, mCurrentService.getTimeout())) {
554: mServiceIndex++;
555: executeService(mServiceIndex);
556: } else {
557: sendListError();
558: }
559:
560: mLog.fine(this + "Process response completed");
561: }
562:
563: /**
564: *
565: */
566: public void resumeTX() {
567: if (mCurrentTransaction != null) {
568: try {
569: mLog.fine(mTranslator.getString(SEQ_RESUME_TX,
570: mCurrentExchange.getExchangeId()));
571: SequencingEngineContext.getInstance()
572: .getTransactionManager().resume(
573: mCurrentTransaction);
574: } catch (javax.transaction.SystemException se) {
575: mLog.severe(mTranslator.getString(SEQ_RESUME_TX_FAILED,
576: mCurrentExchange.getExchangeId()));
577: se.printStackTrace();
578: } catch (javax.transaction.InvalidTransactionException ite) {
579: mLog.severe(mTranslator.getString(SEQ_RESUME_TX_FAILED,
580: mCurrentExchange.getExchangeId()));
581: ite.printStackTrace();
582: }
583: }
584: }
585:
586: /**
587: * sends an error message for list execution.
588: */
589: public void sendListError() {
590: mLog.severe(this + "Sending list error");
591:
592: if ((mLastError == null) && (mLastFaultMessage == null)) {
593: mLastError = new Exception("Unknown error");
594: }
595:
596: MessageExchangeHelper.updateMessage(mInboundExchange, null,
597: mLastError, mLastFaultMessage);
598: send(mInboundExchange, -1);
599: setState(ServicelistState.ERROR);
600: }
601:
602: /**
603: * sends the response for the list execution.
604: */
605: public void sendListResponse() {
606: mLog.info(this + mTranslator.getString(SEQ_SEND_LIST_RESPONSE));
607: mLastError = null;
608:
609: if ((mLastInputMessage == null) && (mLastFaultMessage == null)) {
610: mLastError = new Exception(mTranslator
611: .getString(SEQ_NULL_OUT_MESSAGE));
612: }
613:
614: MessageExchangeHelper.updateMessage(mInboundExchange,
615: mLastOutputMessage, mLastError, mLastFaultMessage);
616: send(mInboundExchange, -1);
617: setState(ServicelistState.COMPLETED);
618: }
619:
620: /**
621: * starts the timer.
622: *
623: * @param timeout timeout , if -1 then timer is not started
624: */
625: public void startTimer(long timeout) {
626: mTimer.cancel();
627: mTimer = new Timer();
628: mTimedOut = false;
629:
630: if (timeout != 0) {
631: mTimer.schedule(new ReminderTask(), timeout);
632: }
633: }
634:
635: /**
636: * helper to get the interface endpoint for a service.
637: *
638: * @param namespace service namespace
639: * @param name service name
640: * @param epname endpoint name
641: *
642: * @return ServiceEndpoint
643: */
644: private ServiceEndpoint getInterfaceEndpoint(String namespace,
645: String name, String epname) {
646: ServiceEndpoint ep = null;
647:
648: try {
649: ServiceEndpoint[] ref = SequencingEngineContext
650: .getInstance().getContext().getEndpoints(
651: new QName(namespace, name));
652:
653: if ((ref == null) || (ref.length == 0)) {
654: return null;
655: } else {
656: ep = ref[0];
657:
658: for (int k = 0; k < ref.length; k++) {
659: if (ref[k].getEndpointName().trim().equals(
660: epname.trim())) {
661: ep = ref[k];
662: }
663: }
664: }
665: } catch (Exception e) {
666: mLog.info(mTranslator.getString(SEQ_CANNOT_GET_ENDPOINT,
667: namespace, name));
668: e.printStackTrace();
669:
670: return null;
671: }
672:
673: return ep;
674: }
675:
676: /**
677: * helper to get the local endpoint for a service.
678: *
679: * @param namespace service namespace
680: * @param name service name
681: * @param epname endpoint name
682: *
683: * @return ServiceEndpoint
684: */
685: private ServiceEndpoint getServiceEndpoint(String namespace,
686: String name, String epname) {
687: ServiceEndpoint ep = null;
688:
689: try {
690: ServiceEndpoint[] ref = SequencingEngineContext
691: .getInstance().getContext().getEndpointsForService(
692: new QName(namespace, name));
693:
694: if ((ref == null) || (ref.length == 0)) {
695: return null;
696: } else {
697: ep = ref[0];
698:
699: for (int k = 0; k < ref.length; k++) {
700: if (ref[k].getEndpointName().trim().equals(
701: epname.trim())) {
702: ep = ref[k];
703: }
704: }
705: }
706: } catch (Exception e) {
707: mLog.info(mTranslator.getString(SEQ_CANNOT_GET_ENDPOINT,
708: namespace, name));
709: e.printStackTrace();
710:
711: return null;
712: }
713:
714: return ep;
715: }
716:
717: /**
718: * Processed any request exchanges.
719: */
720: private void processRequest() {
721: if (mCurrentExchange.getStatus() == ExchangeStatus.DONE) {
722: mLog.info(this .toString()
723: + mTranslator.getString(SEQ_LIST_COMPLETED,
724: this .mListBean.getServicename()));
725: mState = ServicelistState.COMPLETED;
726:
727: return;
728: }
729:
730: if (mCurrentExchange.getStatus() == ExchangeStatus.ERROR) {
731: mLog.severe(mTranslator.getString(SEQ_LIST_COMPLETED_ERROR,
732: this .mListBean.getServicename()));
733: mState = ServicelistState.ERROR;
734:
735: return;
736: }
737:
738: mInboundExchange = mCurrentExchange;
739: mLastInputMessage = MessageExchangeHelper
740: .getInMessage(mCurrentExchange);
741:
742: if (mLastInputMessage == null) {
743: mLog.severe(mTranslator.getString(SEQ_NO_INPUT_MESSAGE,
744: this .mListBean.getServicename()));
745: MessageExchangeHelper.updateMessage(mCurrentExchange, null,
746: new Exception(mTranslator.getString(
747: SEQ_NO_INPUT_MESSAGE, this .mListBean
748: .getServicename())), null);
749: send(mCurrentExchange, -1);
750: mState = ServicelistState.ERROR;
751:
752: return;
753: }
754:
755: executeService(mServiceIndex);
756: }
757:
758: /**
759: * Registers the message exchange.
760: *
761: * @param msg ME
762: */
763: private void registerExchange(MessageExchange msg) {
764: String reqid = (String) msg
765: .getProperty(ConfigData.REQUEST_SEQ_ID);
766: String respid = (String) msg
767: .getProperty(ConfigData.RESPONSE_SEQ_ID);
768:
769: if (respid != null) {
770: mMessageRegistry.registerExchange(msg.getExchangeId()
771: + respid, this );
772: } else if (reqid != null) {
773: mMessageRegistry.registerExchange(msg.getExchangeId()
774: + reqid, this );
775: } else {
776: mLog.severe("Registration info missing");
777: }
778: }
779:
780: /**
781: * Resets the timer.
782: */
783: private void resetTimer() {
784: if (mTimer != null) {
785: mTimer.cancel();
786: }
787: }
788:
789: /**
790: * Method that sends any message exchnage on the channel.
791: *
792: * @param msg exchange to be sent
793: * @param timeout timeout for that exchange
794: *
795: * @return true if success else false
796: */
797: private boolean send(MessageExchange msg, long timeout) {
798: /*
799: * Send message mSendMessage
800: * register in message registry
801: * start timer -1 indicates no timer
802: */
803: if (msg == null) {
804: mLog.severe(SEQ_NULL_EXCHANGE);
805:
806: return false;
807: }
808:
809: try {
810: mLog.info(this .toString()
811: + mTranslator.getString(SEQ_FINE_SEND_MESSAGE, msg
812: .getEndpoint().getServiceName()));
813:
814: if ((msg.getStatus() == ExchangeStatus.DONE)
815: || (msg.getStatus() == ExchangeStatus.ERROR)) {
816: mLog.info(this .toString()
817: + mTranslator.getString(SEQ_FINE_SEND_ACTIVE,
818: msg.getStatus().toString()));
819: mResponseExpected = false;
820: } else {
821: registerExchange(msg);
822: setState(ServicelistState.WAITING);
823: mResponseExpected = true;
824: }
825:
826: mCurrentExchange = msg;
827: msg.setProperty(
828: MessageExchange.JTA_TRANSACTION_PROPERTY_NAME,
829: mCurrentTransaction);
830: resumeTX();
831: mChannel.send(msg);
832: } catch (MessagingException me) {
833: mLog.severe(mTranslator.getString(SEQ_SEND_FAILED));
834: mLog.severe(me.getMessage());
835: mLastError = me;
836: me.printStackTrace();
837: mResponseExpected = false;
838: setState(ServicelistState.ERROR);
839:
840: return false;
841: }
842:
843: if ((mResponseExpected) && (timeout != -1)) {
844: startTimer(timeout);
845: }
846:
847: return true;
848: }
849:
850: /**
851: * This is a reminder task for the timer.
852: *
853: * @author Ramesh
854: */
855: public class ReminderTask extends TimerTask {
856: /**
857: * run method for the thread.
858: */
859: public void run() {
860: /* call some method
861: * that will deregister the message from
862: * message registry and return an error response
863: */
864: mTimedOut = true;
865: cancelExchange();
866: }
867: }
868: }
|