001: // Copyright (c) 2004-2005 Sun Microsystems Inc., All Rights Reserved.
002:
003: /*
004: * FileReceiver.java
005: *
006: * SUN PROPRIETARY/CONFIDENTIAL.
007: * This software is the proprietary information of Sun Microsystems, Inc.
008: * Use is subject to license terms.
009: *
010: */
011: package com.sun.jbi.binding.file;
012:
013: import com.sun.jbi.binding.file.FileBindingContext;
014: import com.sun.jbi.binding.file.framework.WorkManager;
015: import com.sun.jbi.binding.file.util.StringTranslator;
016:
017: import java.util.logging.Logger;
018:
019: import javax.jbi.messaging.DeliveryChannel;
020: import javax.jbi.messaging.MessageExchange;
021:
022: /**
023: * The File Reciever class receives the Normalized Message from the NMR and
024: * writes the contents to the directory specified in file.xml
025: *
026: * @author Sun Microsystems, Inc.
027: */
028: class FileReceiver implements Runnable, FileBindingResources {
029: /**
030: * Time out for receive
031: */
032: private static final long TIME_OUT = 500;
033:
034: /**
035: * Wait time for thread.
036: */
037: private static final long WAIT_TIME = 100;
038:
039: /**
040: * Receiver.
041: */
042: private static final String RECEIVER = "RECEIVER";
043:
044: /**
045: * Binding channel.
046: */
047: private DeliveryChannel mChannel;
048:
049: /**
050: * Logger Object
051: */
052: private Logger mLog;
053:
054: /**
055: * Normalized Message
056: */
057: private MessageExchange mExchange;
058:
059: /**
060: * Monitor Object to stop the thread.
061: */
062: private Object mMonitor;
063:
064: /**
065: * Helper for i18n.
066: */
067: private StringTranslator mTranslator;
068:
069: /**
070: * Thread framework object.
071: */
072: private WorkManager mWorkManager;
073:
074: /**
075: * Creates the FileReceiver Thread.
076: *
077: * @param bc Thread group for this receiver
078: */
079: public FileReceiver(DeliveryChannel bc) {
080: mLog = FileBindingContext.getInstance().getLogger();
081: mTranslator = new StringTranslator();
082: mChannel = bc;
083: mWorkManager = WorkManager.getWorkManager(RECEIVER);
084: mMonitor = new Object();
085: }
086:
087: /**
088: * Blocking call on the service channel to receive the message.
089: */
090: public void run() {
091: mLog.info(mTranslator.getString(FBC_RECEIVER_START));
092: mWorkManager.start();
093:
094: while (mMonitor != null) {
095: try {
096: mExchange = mChannel.accept(TIME_OUT);
097:
098: if (mExchange != null) {
099: MessageProcessor proc = new MessageProcessor(
100: mChannel, mExchange);
101: mLog.info(mTranslator
102: .getString(FBC_MESSAGE_RECEIVED));
103:
104: if (!mWorkManager.processCommand(proc)) {
105: mLog.info(mTranslator
106: .getString(FBC_NO_FREE_THREAD));
107: }
108: }
109: } catch (Exception e) {
110: mLog.info(mTranslator.getString(FBC_RECEIVER_ERROR));
111: mLog.severe(e.getMessage());
112: mWorkManager.cease();
113:
114: return;
115: }
116: }
117: }
118:
119: /**
120: * Stops the receiving thread.
121: */
122: public void stopReceiving() {
123: mLog.info(mTranslator.getString(FBC_RECEIVER_STOP));
124: mMonitor = null;
125: }
126: }
|