001: // Copyright (c) 2004-2005 Sun Microsystems Inc., All Rights Reserved.
002:
003: /*
004: * FileThreads.java
005: *
006: * SUN PROPRIETARY/CONFIDENTIAL.
007: * This software is the proprietary information of Sun Microsystems, Inc.
008: * Use is subject to license terms.
009: *
010: */
011: package com.sun.jbi.binding.file;
012:
013: import com.sun.jbi.binding.file.FileBindingContext;
014: import com.sun.jbi.binding.file.framework.WorkManager;
015: import com.sun.jbi.binding.file.util.ConfigData;
016: import com.sun.jbi.binding.file.util.FileBindingUtil;
017: import com.sun.jbi.binding.file.util.FileListing;
018: import com.sun.jbi.binding.file.util.InputFileFilter;
019: import com.sun.jbi.binding.file.util.StringTranslator;
020: import com.sun.jbi.binding.file.util.UtilBase;
021:
022: import java.io.File;
023:
024: import java.util.HashMap;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.logging.Logger;
028:
029: import javax.jbi.component.ComponentContext;
030: import javax.jbi.servicedesc.ServiceEndpoint;
031:
032: import javax.xml.namespace.QName;
033:
034: /**
035: * The File Threads class processes the requests by polling at the directory
036: * specified in deployment artifact file. For each file to be processed, it
037: * spawns a thread and processes it.
038: *
039: * @author Sun Microsystems, Inc.
040: */
041: public class FileThreads extends UtilBase implements Runnable,
042: FileBindingResources {
043: /**
044: * Default min threads.
045: */
046: private static final int DEFAULT_MAX_THREADS = 10;
047:
048: /**
049: * Default max threads.
050: */
051: private static final int DEFAULT_MIN_THREADS = 5;
052:
053: /**
054: * Wait time for polling the read directory.
055: */
056: private static final int WAIT_TIME = 50;
057:
058: /**
059: * Component Context.
060: */
061: private ComponentContext mContext;
062:
063: /**
064: * Endpoint attributes object.
065: */
066: private EndpointBean mBean;
067:
068: /**
069: * Folder name.
070: */
071: private File mDirName;
072:
073: /**
074: * Logger Object
075: */
076: private Logger mLog;
077:
078: /**
079: * Monitor Object
080: */
081: private Object mMonitor;
082:
083: /**
084: * Helper for i18n.
085: */
086: private StringTranslator mTranslator;
087:
088: /**
089: * The ThreadGroup for the Endpoint.
090: */
091: private WorkManager mWorkManager = null;
092:
093: /**
094: * Flag to hold the state of this thread.
095: */
096: private boolean mRunning;
097:
098: /**
099: * Last bacth size
100: */
101: private int mLastBatchSize;
102:
103: /**
104: * Constructor which sets the Normlized message object, Required to create
105: * messages every time we send a message.
106: *
107: * @param endpointInfo end point bean object
108: */
109: public FileThreads(EndpointBean endpointInfo) {
110: mLog = FileBindingContext.getInstance().getLogger();
111: mTranslator = new StringTranslator();
112: mBean = endpointInfo;
113: mContext = FileBindingContext.getInstance().getContext();
114:
115: String endptname = endpointInfo.getUniqueName();
116: mWorkManager = WorkManager.getWorkManager(endptname);
117: mWorkManager.setMinThreads(DEFAULT_MIN_THREADS);
118: mWorkManager.setMaxThreads(DEFAULT_MAX_THREADS);
119: mMonitor = new Object();
120: }
121:
122: /**
123: * Returns the EndpointBean.
124: *
125: * @return endpoitn bean object.
126: */
127: public EndpointBean getBean() {
128: return mBean;
129: }
130:
131: /**
132: * MEhtod to check if the thread is still executing.
133: *
134: * @return true if running false otherwise.
135: */
136: public boolean isRunning() {
137: return mRunning;
138: }
139:
140: /**
141: * Implemented as a thread because it has to poll a folder continuosly.
142: */
143: public void run() {
144: mRunning = true;
145: mLastBatchSize = 0;
146:
147: if (!mRunning) {
148: return;
149: }
150:
151: HashMap listFiles = null;
152: InputFileFilter filefilter = new InputFileFilter();
153: filefilter.setFilterexpression(mBean
154: .getValue(ConfigData.INPUTPATTERN));
155: mWorkManager.start();
156:
157: String nam = (String) mBean.getValue(ConfigData.INPUTDIR);
158:
159: if (nam != null) {
160: mDirName = new File(nam);
161: } else {
162: mRunning = false;
163: mLog.severe(mTranslator.getString(FBC_STOP_POLLING, mBean
164: .getUniqueName()));
165:
166: return;
167: }
168:
169: while (mMonitor != null) {
170: try {
171: listFiles = getFiles(filefilter);
172:
173: if (listFiles == null) {
174: mWorkManager.cease();
175: mLog.severe(mTranslator.getString(FBC_STOP_POLLING,
176: mBean.getUniqueName()));
177:
178: return;
179: }
180:
181: if (listFiles.isEmpty()) {
182: gotoSleep();
183:
184: continue;
185: }
186:
187: Iterator listIterator = listFiles.keySet().iterator();
188: mLastBatchSize = listFiles.size();
189:
190: while (listIterator.hasNext()) {
191: File f = (File) listIterator.next();
192:
193: if ((f.canWrite()) && (!f.isHidden())) {
194: QName oper = (QName) listFiles.get(f);
195: String trk = FileBindingUtil.getTrackingId();
196: FileBindingUtil.addEntry(trk, f
197: .getAbsolutePath());
198: mWorkManager.processCommand(new FileCommand(f,
199: mBean, oper, trk));
200: } else {
201: mLog.severe(mTranslator.getString(
202: FBC_CANNOT_PROCESS_FILE, f
203: .getAbsolutePath()));
204: }
205: }
206:
207: mWorkManager.finishWork();
208: listFiles.clear();
209: } catch (Exception we) {
210: we.printStackTrace();
211:
212: continue;
213: }
214: }
215:
216: /*
217: *Destroy, the thread pool, when needed to stop the FBC
218: */
219: mWorkManager.cease();
220: mLog.severe(mTranslator.getString(FBC_STOP_POLLING, mBean
221: .getUniqueName()));
222: mRunning = false;
223: }
224:
225: /**
226: * Stops the File thread.
227: */
228: public void stopAll() {
229: mLog.info(mTranslator.getString(FBC_STOP_POLLING, mBean
230: .getUniqueName()));
231: mMonitor = null;
232: }
233:
234: /**
235: * Gets a list of files from all the folders that need to be checked.
236: *
237: * @param filter filter pattern for reading files.
238: *
239: * @return List of file operation combinations.
240: */
241: private HashMap getFiles(InputFileFilter filter) {
242: HashMap filesmap = new HashMap();
243:
244: try {
245: addFiles(filesmap, mDirName, mBean.getDefaultOperation(),
246: filter);
247:
248: for (int i = 0; i < mBean.getOperationsCount(); i++) {
249: File newdir = new File(mDirName.getAbsolutePath()
250: + File.separatorChar
251: + mBean.getOperationQName(i).getLocalPart());
252:
253: addFiles(filesmap, newdir, mBean.getOperationQName(i),
254: filter);
255: }
256: } catch (Exception e) {
257: e.printStackTrace();
258: }
259:
260: return filesmap;
261: }
262:
263: /**
264: * Util method for adding files into list.
265: *
266: * @param map List into which files have to be added
267: * @param newdir folder to check.
268: * @param oper operatio name.
269: * @param fil filter.
270: */
271: private void addFiles(HashMap map, File newdir, QName oper,
272: InputFileFilter fil) {
273: List l = FileListing.getFileListing(newdir, false, fil);
274:
275: try {
276: if ((l != null) && (l.size() > 0)) {
277: for (int j = 0; j < l.size(); j++) {
278: File f = (File) l.get(j);
279: map.put(f, oper);
280: }
281: }
282: } catch (Exception e) {
283: e.printStackTrace();
284: }
285: }
286:
287: /**
288: * Sleep logic.
289: */
290: private void gotoSleep() {
291: try {
292: /**
293: * If we find less files in the last iteration then we sleep more,
294: * else we sleep less 0 files mean max sleep time of 500ms
295: * BATCH_SIZE files mean least sleep time of 0 sec
296: */
297: Thread.sleep((ConfigData.BATCH_SIZE - mLastBatchSize)
298: * WAIT_TIME);
299: } catch (Exception ee) {
300: //ee.printStackTrace();
301: }
302: }
303: }
|