001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MessageReceiver.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.engine.sequencing;
030:
031: import com.sun.jbi.engine.sequencing.SequencingEngineContext;
032: import com.sun.jbi.engine.sequencing.framework.Servicelist;
033: import com.sun.jbi.engine.sequencing.framework.threads.WorkManager;
034: import com.sun.jbi.engine.sequencing.util.StringTranslator;
035:
036: import java.util.logging.Logger;
037:
038: import javax.jbi.messaging.DeliveryChannel;
039: import javax.jbi.messaging.MessageExchange;
040:
041: /**
042: * MessageReceiver
043: *
044: * @author Sun Microsystems, Inc.
045: */
046: class MessageReceiver implements Runnable, SequencingEngineResources {
047: /**
048: * Time out for receive
049: */
050: private static final long TIME_OUT = 500;
051:
052: /**
053: * EngineChannel object
054: */
055: private DeliveryChannel mChannel;
056:
057: /**
058: * Logger Object
059: */
060: private Logger mLog;
061:
062: /**
063: * Normalized Message
064: */
065: private MessageExchange mExchange;
066:
067: /**
068: * Message processor
069: */
070: private MessageProcessor mMessageProcessor;
071:
072: /**
073: * a Monitor Object to stop the thread.
074: */
075: private Object mMonitor;
076:
077: /**
078: * Translator object for internationalization.
079: */
080: private StringTranslator mTranslator;
081:
082: /**
083: * Work Manager, which maintians thread pool.
084: */
085: private WorkManager mWorkManager;
086:
087: /**
088: * Creates the MessageReceiver Thread.
089: *
090: * @param bc delivery channel
091: */
092: public MessageReceiver(DeliveryChannel bc) {
093: mLog = SequencingEngineContext.getInstance().getLogger();
094: mChannel = bc;
095: mWorkManager = WorkManager.getWorkManager("SEQ_RECEIVER");
096: mMessageProcessor = new MessageProcessor(mChannel);
097: mMonitor = new Object();
098: mTranslator = new StringTranslator();
099: }
100:
101: /**
102: * Blocking call on the service channel to receive the message.
103: */
104: public void run() {
105: mLog.info(mTranslator.getString(SEQ_RECEIVER_START));
106: mWorkManager.start();
107:
108: while (mMonitor != null) {
109: try {
110: mExchange = mChannel.accept(TIME_OUT);
111:
112: if (mExchange != null) {
113: Servicelist list = null;
114:
115: mMessageProcessor.setExchange(mExchange);
116: list = mMessageProcessor.process();
117:
118: if ((list == null) || (!mMessageProcessor.valid())) {
119: mLog.severe(mTranslator.getString(
120: SEQ_INVALID_MESSAGE, mExchange
121: .getExchangeId()));
122:
123: continue;
124: }
125:
126: if (!mWorkManager.processCommand(list)) {
127: mLog.info(mTranslator
128: .getString(SEQ_NO_FREE_THREAD));
129: }
130: }
131: } catch (Exception e) {
132: mLog.severe(mTranslator.getString(SEQ_RECEIVER_ERROR));
133: e.printStackTrace();
134: mWorkManager.cease();
135:
136: return;
137: }
138: }
139:
140: mWorkManager.cease();
141: mLog.info(mTranslator.getString(SEQ_RECEIVER_STOPPED));
142: }
143:
144: /**
145: * Stops new requests for this engine.
146: */
147: public void stopNewRequests() {
148: mMessageProcessor.stopNewRequests();
149: }
150:
151: /**
152: * Stops the receiving thread.
153: */
154: public void stopReceiving() {
155: mLog.info(mTranslator.getString(SEQ_RECEIVER_STOP));
156:
157: // Close the Receiver now.
158: mMonitor = null;
159: }
160: }
|