001: // Copyright (c) 2004-2005 Sun Microsystems Inc., All Rights Reserved.
002:
003: /*
004: * MessageReceiver.java
005: *
006: * SUN PROPRIETARY/CONFIDENTIAL.
007: * This software is the proprietary information of Sun Microsystems, Inc.
008: * Use is subject to license terms.
009: *
010: */
011: package com.sun.jbi.engine.sequencing;
012:
013: import com.sun.jbi.engine.sequencing.SequencingEngineContext;
014: import com.sun.jbi.engine.sequencing.framework.Servicelist;
015: import com.sun.jbi.engine.sequencing.framework.threads.WorkManager;
016: import com.sun.jbi.engine.sequencing.util.StringTranslator;
017:
018: import java.util.logging.Logger;
019:
020: import javax.jbi.messaging.DeliveryChannel;
021: import javax.jbi.messaging.MessageExchange;
022:
023: /**
024: * MessageReceiver
025: *
026: * @author Sun Microsystems, Inc.
027: */
028: class MessageReceiver implements Runnable, SequencingEngineResources {
029: /**
030: * Time out for receive
031: */
032: private static final long TIME_OUT = 500;
033:
034: /**
035: * EngineChannel object
036: */
037: private DeliveryChannel mChannel;
038:
039: /**
040: * Logger Object
041: */
042: private Logger mLog;
043:
044: /**
045: * Normalized Message
046: */
047: private MessageExchange mExchange;
048:
049: /**
050: * Message processor
051: */
052: private MessageProcessor mMessageProcessor;
053:
054: /**
055: * a Monitor Object to stop the thread.
056: */
057: private Object mMonitor;
058:
059: /**
060: * Translator object for internationalization.
061: */
062: private StringTranslator mTranslator;
063:
064: /**
065: * Work Manager, which maintians thread pool.
066: */
067: private WorkManager mWorkManager;
068:
069: /**
070: * Creates the MessageReceiver Thread.
071: *
072: * @param bc delivery channel
073: */
074: public MessageReceiver(DeliveryChannel bc) {
075: mLog = SequencingEngineContext.getInstance().getLogger();
076: mChannel = bc;
077: mWorkManager = WorkManager.getWorkManager("SEQ_RECEIVER");
078: mMessageProcessor = new MessageProcessor(mChannel);
079: mMonitor = new Object();
080: mTranslator = new StringTranslator();
081: }
082:
083: /**
084: * Blocking call on the service channel to receive the message.
085: */
086: public void run() {
087: mLog.info(mTranslator.getString(SEQ_RECEIVER_START));
088: mWorkManager.start();
089:
090: while (mMonitor != null) {
091: try {
092: mExchange = mChannel.accept(TIME_OUT);
093:
094: if (mExchange != null) {
095: Servicelist list = null;
096:
097: mMessageProcessor.setExchange(mExchange);
098: list = mMessageProcessor.process();
099:
100: if ((list == null) || (!mMessageProcessor.valid())) {
101: mLog.severe(mTranslator.getString(
102: SEQ_INVALID_MESSAGE, mExchange
103: .getExchangeId()));
104:
105: continue;
106: }
107:
108: if (!mWorkManager.processCommand(list)) {
109: mLog.info(mTranslator
110: .getString(SEQ_NO_FREE_THREAD));
111: }
112: }
113: } catch (Exception e) {
114: mLog.severe(mTranslator.getString(SEQ_RECEIVER_ERROR));
115: e.printStackTrace();
116: mWorkManager.cease();
117:
118: return;
119: }
120: }
121:
122: mWorkManager.cease();
123: mLog.info(mTranslator.getString(SEQ_RECEIVER_STOPPED));
124: }
125:
126: /**
127: * Stops new requests for this engine.
128: */
129: public void stopNewRequests() {
130: mMessageProcessor.stopNewRequests();
131: }
132:
133: /**
134: * Stops the receiving thread.
135: */
136: public void stopReceiving() {
137: mLog.info(mTranslator.getString(SEQ_RECEIVER_STOP));
138:
139: // Close the Receiver now.
140: mMonitor = null;
141: }
142: }
|