001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)JMSReceiver.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.jms;
030:
031: import com.sun.jbi.StringTranslator;
032:
033: import com.sun.jbi.binding.jms.framework.WorkManager;
034:
035: import com.sun.jbi.binding.jms.config.ConfigConstants;
036: import com.sun.jbi.binding.jms.handler.MessageHandler;
037:
038: import java.util.logging.Logger;
039:
040: import javax.jbi.messaging.DeliveryChannel;
041: import javax.jbi.messaging.MessageExchange;
042:
043: /**
044: * The JMS Reciever class receives the Normalized Message from the NMS and
045: * writes the contents to the directory specified in endpoints.xml.
046: *
047: * @author Sun Microsystems Inc.
048: */
049: class JMSReceiver implements Runnable, JMSBindingResources {
050: /**
051: * Receiver work manager.
052: */
053: private static final String RECEIVER = "RECEIVER";
054: /**
055: * Channel.
056: */
057: private DeliveryChannel mChannel;
058:
059: /**
060: * Logger Object.
061: */
062: private Logger mLogger;
063:
064: /**
065: * Normalized Message.
066: */
067: private MessageExchange mExchange;
068:
069: /**
070: * Message processor.
071: */
072: private MessageProcessor mProcessor;
073:
074: /**
075: * Helper for i18n.
076: */
077: private StringTranslator mStringTranslator;
078:
079: /**
080: * Work Manager.
081: */
082: private WorkManager mWorkManager;
083:
084: /**
085: * Creates the JMSReceiver Thread.
086: *
087: * @param bc Thread group for this receiver
088: */
089: public JMSReceiver(DeliveryChannel bc) {
090: mLogger = JMSBindingContext.getInstance().getLogger();
091: mStringTranslator = JMSBindingContext.getInstance()
092: .getStringTranslator();
093: mChannel = bc;
094: mWorkManager = WorkManager
095: .getWorkManager(ConfigConstants.NMS_WORK_MANAGER);
096: mWorkManager.setMinThreads(JMSBindingContext.getInstance()
097: .getConfig().getMinThreadCount());
098: mWorkManager.setMaxThreads(JMSBindingContext.getInstance()
099: .getConfig().getMaxThreadCount());
100: mProcessor = new MessageProcessor(bc);
101: }
102:
103: /**
104: * Blocking call on the service channel to receive the message. Right now,
105: * we wait for 10 seconds. Can be made to configure.
106: */
107: public void run() {
108: mLogger.info(mStringTranslator.getString(JMS_RECEIVER_START));
109: mWorkManager.start();
110: mLogger.info("Started the work manager in JMSReceiver");
111: while (true) {
112: try {
113: mExchange = mChannel.accept();
114: if (mExchange != null) {
115: MessageHandler handler = null;
116: mProcessor.setExchange(mExchange);
117: mLogger.info("Received the message "
118: + mExchange.getExchangeId());
119: handler = mProcessor.process();
120:
121: if (handler == null) {
122: mLogger.severe(mStringTranslator.getString(
123: JMS_INVALID_MESSAGE, mExchange
124: .getExchangeId()));
125:
126: continue;
127: }
128:
129: if (!mWorkManager.processCommand(handler)) {
130: mLogger.info(mStringTranslator
131: .getString(JMS_NO_FREE_THREAD));
132: }
133: }
134: } catch (Exception e) {
135: mLogger.info(e.getMessage());
136: mWorkManager.cease();
137:
138: return;
139: }
140: }
141: }
142:
143: /**
144: * Stops the receiving thread.
145: */
146: public void stopReceiving() {
147: mLogger.info(mStringTranslator.getString(JMS_RECEIVER_STOP));
148:
149: }
150: }
|