001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)FileThreads.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.file;
030:
031: import com.sun.jbi.binding.file.FileBindingContext;
032: import com.sun.jbi.binding.file.framework.WorkManager;
033: import com.sun.jbi.binding.file.util.ConfigData;
034: import com.sun.jbi.binding.file.util.FileBindingUtil;
035: import com.sun.jbi.binding.file.util.FileListing;
036: import com.sun.jbi.binding.file.util.InputFileFilter;
037: import com.sun.jbi.binding.file.util.StringTranslator;
038: import com.sun.jbi.binding.file.util.UtilBase;
039:
040: import java.io.File;
041:
042: import java.util.HashMap;
043: import java.util.Iterator;
044: import java.util.List;
045: import java.util.logging.Logger;
046:
047: import javax.jbi.component.ComponentContext;
048: import javax.jbi.servicedesc.ServiceEndpoint;
049:
050: import javax.xml.namespace.QName;
051:
052: /**
053: * The File Threads class processes the requests by polling at the directory
054: * specified in deployment artifact file. For each file to be processed, it
055: * spawns a thread and processes it.
056: *
057: * @author Sun Microsystems, Inc.
058: */
059: public class FileThreads extends UtilBase implements Runnable,
060: FileBindingResources {
061: /**
062: * Default min threads.
063: */
064: private static final int DEFAULT_MAX_THREADS = 10;
065:
066: /**
067: * Default max threads.
068: */
069: private static final int DEFAULT_MIN_THREADS = 5;
070:
071: /**
072: * Wait time for polling the read directory.
073: */
074: private static final int WAIT_TIME = 50;
075:
076: /**
077: * Component Context.
078: */
079: private ComponentContext mContext;
080:
081: /**
082: * Endpoint attributes object.
083: */
084: private EndpointBean mBean;
085:
086: /**
087: * Folder name.
088: */
089: private File mDirName;
090:
091: /**
092: * Logger Object
093: */
094: private Logger mLog;
095:
096: /**
097: * Monitor Object
098: */
099: private Object mMonitor;
100:
101: /**
102: * Helper for i18n.
103: */
104: private StringTranslator mTranslator;
105:
106: /**
107: * The ThreadGroup for the Endpoint.
108: */
109: private WorkManager mWorkManager = null;
110:
111: /**
112: * Flag to hold the state of this thread.
113: */
114: private boolean mRunning;
115:
116: /**
117: * Last bacth size
118: */
119: private int mLastBatchSize;
120:
121: /**
122: * Constructor which sets the Normlized message object, Required to create
123: * messages every time we send a message.
124: *
125: * @param endpointInfo end point bean object
126: */
127: public FileThreads(EndpointBean endpointInfo) {
128: mLog = FileBindingContext.getInstance().getLogger();
129: mTranslator = new StringTranslator();
130: mBean = endpointInfo;
131: mContext = FileBindingContext.getInstance().getContext();
132:
133: String endptname = endpointInfo.getUniqueName();
134: mWorkManager = WorkManager.getWorkManager(endptname);
135: mWorkManager.setMinThreads(DEFAULT_MIN_THREADS);
136: mWorkManager.setMaxThreads(DEFAULT_MAX_THREADS);
137: mMonitor = new Object();
138: }
139:
140: /**
141: * Returns the EndpointBean.
142: *
143: * @return endpoitn bean object.
144: */
145: public EndpointBean getBean() {
146: return mBean;
147: }
148:
149: /**
150: * MEhtod to check if the thread is still executing.
151: *
152: * @return true if running false otherwise.
153: */
154: public boolean isRunning() {
155: return mRunning;
156: }
157:
158: /**
159: * Implemented as a thread because it has to poll a folder continuosly.
160: */
161: public void run() {
162: mRunning = true;
163: mLastBatchSize = 0;
164:
165: if (!mRunning) {
166: return;
167: }
168:
169: HashMap listFiles = null;
170: InputFileFilter filefilter = new InputFileFilter();
171: filefilter.setFilterexpression(mBean
172: .getValue(ConfigData.INPUTPATTERN));
173: mWorkManager.start();
174:
175: String nam = (String) mBean.getValue(ConfigData.INPUTDIR);
176:
177: if (nam != null) {
178: mDirName = new File(nam);
179: } else {
180: mRunning = false;
181: mLog.info(mTranslator.getString(FBC_STOP_POLLING, mBean
182: .getUniqueName()));
183:
184: return;
185: }
186:
187: while (mMonitor != null) {
188: try {
189: listFiles = getFiles(filefilter);
190:
191: if (listFiles == null) {
192: mWorkManager.cease();
193: mLog.info(mTranslator.getString(FBC_STOP_POLLING,
194: mBean.getUniqueName()));
195:
196: return;
197: }
198:
199: if (listFiles.isEmpty()) {
200: gotoSleep();
201:
202: continue;
203: }
204:
205: Iterator listIterator = listFiles.keySet().iterator();
206: mLastBatchSize = listFiles.size();
207:
208: while (listIterator.hasNext()) {
209: File f = (File) listIterator.next();
210:
211: if ((f.canWrite()) && (!f.isHidden())) {
212: QName oper = (QName) listFiles.get(f);
213: String trk = FileBindingUtil.getTrackingId();
214: FileBindingUtil.addEntry(trk, f
215: .getAbsolutePath());
216: mWorkManager.processCommand(new FileCommand(f,
217: mBean, oper, trk));
218: } else {
219: mLog.severe(mTranslator.getString(
220: FBC_CANNOT_PROCESS_FILE, f
221: .getAbsolutePath()));
222: }
223: }
224:
225: mWorkManager.finishWork();
226: listFiles.clear();
227: } catch (Exception we) {
228: we.printStackTrace();
229:
230: continue;
231: }
232: }
233:
234: /*
235: *Destroy, the thread pool, when needed to stop the FBC
236: */
237: mWorkManager.cease();
238: mLog.info(mTranslator.getString(FBC_STOP_POLLING, mBean
239: .getUniqueName()));
240: mRunning = false;
241: }
242:
243: /**
244: * Stops the File thread.
245: */
246: public void stopAll() {
247: mLog.info(mTranslator.getString(FBC_STOP_POLLING, mBean
248: .getUniqueName()));
249: mMonitor = null;
250: }
251:
252: /**
253: * Gets a list of files from all the folders that need to be checked.
254: *
255: * @param filter filter pattern for reading files.
256: *
257: * @return List of file operation combinations.
258: */
259: private HashMap getFiles(InputFileFilter filter) {
260: HashMap filesmap = new HashMap();
261:
262: try {
263: addFiles(filesmap, mDirName, mBean.getDefaultOperation(),
264: filter);
265:
266: for (int i = 0; i < mBean.getOperationsCount(); i++) {
267: File newdir = new File(mDirName.getAbsolutePath()
268: + File.separatorChar
269: + mBean.getOperationQName(i).getLocalPart());
270:
271: addFiles(filesmap, newdir, mBean.getOperationQName(i),
272: filter);
273: }
274: } catch (Exception e) {
275: e.printStackTrace();
276: }
277:
278: return filesmap;
279: }
280:
281: /**
282: * Util method for adding files into list.
283: *
284: * @param map List into which files have to be added
285: * @param newdir folder to check.
286: * @param oper operatio name.
287: * @param fil filter.
288: */
289: private void addFiles(HashMap map, File newdir, QName oper,
290: InputFileFilter fil) {
291: List l = FileListing.getFileListing(newdir, false, fil);
292:
293: try {
294: if ((l != null) && (l.size() > 0)) {
295: for (int j = 0; j < l.size(); j++) {
296: File f = (File) l.get(j);
297: map.put(f, oper);
298: }
299: }
300: } catch (Exception e) {
301: e.printStackTrace();
302: }
303: }
304:
305: /**
306: * Sleep logic.
307: */
308: private void gotoSleep() {
309: try {
310: /**
311: * If we find less files in the last iteration then we sleep more,
312: * else we sleep less 0 files mean max sleep time of 500ms
313: * BATCH_SIZE files mean least sleep time of 0 sec
314: */
315: Thread.sleep((ConfigData.BATCH_SIZE - mLastBatchSize)
316: * WAIT_TIME);
317: } catch (Exception ee) {
318: //ee.printStackTrace();
319: }
320: }
321: }
|