001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.components.file;
018:
019: import org.apache.commons.logging.Log;
020: import org.apache.commons.logging.LogFactory;
021: import org.apache.servicemix.components.util.DefaultFileMarshaler;
022: import org.apache.servicemix.components.util.FileMarshaler;
023: import org.apache.servicemix.components.util.PollingComponentSupport;
024: import org.apache.servicemix.jbi.util.FileUtil;
025:
026: import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
027:
028: import javax.jbi.JBIException;
029: import javax.jbi.management.DeploymentException;
030: import javax.jbi.messaging.InOnly;
031: import javax.jbi.messaging.NormalizedMessage;
032: import java.io.BufferedInputStream;
033: import java.io.File;
034: import java.io.FileFilter;
035: import java.io.FileInputStream;
036: import java.io.IOException;
037: import java.io.InputStream;
038: import java.util.Set;
039:
040: /**
041: * A polling component which looks for a file or files in a directory
042: * and sends the files into the JBI bus as messages, deleting the files
043: * by default when they are processed.
044: *
045: * @version $Revision: 564081 $
046: */
047: public class FilePoller extends PollingComponentSupport {
048: private static final Log log = LogFactory.getLog(FilePoller.class);
049:
050: private File archive;
051: private File file;
052: private FileFilter filter;
053: private boolean deleteFile = true;
054: private boolean recursive = true;
055: private boolean autoCreateDirectory = true;
056: private FileMarshaler marshaler = new DefaultFileMarshaler();
057: private Set workingSet = new CopyOnWriteArraySet();
058:
059: public void poll() throws Exception {
060: pollFileOrDirectory(file);
061: }
062:
063: // Properties
064: //-------------------------------------------------------------------------
065: public File getFile() {
066: return file;
067: }
068:
069: /**
070: * Sets the file to poll, which can be a directory or a file.
071: *
072: * @param file
073: */
074: public void setFile(File file) {
075: this .file = file;
076: }
077:
078: public FileFilter getFilter() {
079: return filter;
080: }
081:
082: /**
083: * Sets the optional filter to choose which files to process
084: */
085: public void setFilter(FileFilter filter) {
086: this .filter = filter;
087: }
088:
089: /**
090: * Returns whether or not we should delete the file when its processed
091: */
092: public boolean isDeleteFile() {
093: return deleteFile;
094: }
095:
096: public void setDeleteFile(boolean deleteFile) {
097: this .deleteFile = deleteFile;
098: }
099:
100: public boolean isRecursive() {
101: return recursive;
102: }
103:
104: public void setRecursive(boolean recursive) {
105: this .recursive = recursive;
106: }
107:
108: public boolean isAutoCreateDirectory() {
109: return autoCreateDirectory;
110: }
111:
112: public void setAutoCreateDirectory(boolean autoCreateDirectory) {
113: this .autoCreateDirectory = autoCreateDirectory;
114: }
115:
116: public FileMarshaler getMarshaler() {
117: return marshaler;
118: }
119:
120: public void setMarshaler(FileMarshaler marshaler) {
121: this .marshaler = marshaler;
122: }
123:
124: public File getArchive() {
125: return archive;
126: }
127:
128: /**
129: * Configure a directory to archive files before deleting them.
130: *
131: * @param archive the archive directory
132: */
133: public void setArchive(File archive) {
134: this .archive = archive;
135: }
136:
137: /**
138: * The set of FTPFiles that this component is currently working on
139: *
140: * @return
141: */
142: public Set getWorkingSet() {
143: return workingSet;
144: }
145:
146: // Implementation methods
147: //-------------------------------------------------------------------------
148: protected void init() throws JBIException {
149: if (file == null) {
150: throw new IllegalArgumentException(
151: "You must specify a file property");
152: }
153: if (isAutoCreateDirectory() && !file.exists()) {
154: file.mkdirs();
155: }
156: if (archive != null) {
157: if (!deleteFile) {
158: throw new DeploymentException(
159: "Archive shouldn't be specified unless deleteFile='true'");
160: }
161: if (isAutoCreateDirectory() && !archive.exists()) {
162: archive.mkdirs();
163: }
164: if (!archive.isDirectory()) {
165: throw new DeploymentException(
166: "Archive should refer to a directory");
167: }
168: }
169: super .init();
170: }
171:
172: protected void pollFileOrDirectory(File fileOrDirectory) {
173: pollFileOrDirectory(fileOrDirectory, true);
174: }
175:
176: protected void pollFileOrDirectory(File fileOrDirectory,
177: boolean processDir) {
178: if (!fileOrDirectory.isDirectory()) {
179: pollFile(fileOrDirectory); // process the file
180: } else if (processDir) {
181: log.debug("Polling directory " + fileOrDirectory);
182: File[] files = fileOrDirectory.listFiles(getFilter());
183: for (int i = 0; i < files.length; i++) {
184: pollFileOrDirectory(files[i], isRecursive()); // self-recursion
185: }
186: } else {
187: log.debug("Skipping directory " + fileOrDirectory);
188: }
189: }
190:
191: protected void pollFile(final File aFile) {
192: if (workingSet.add(aFile)) {
193: if (log.isDebugEnabled()) {
194: log.debug("Scheduling file " + aFile
195: + " for processing");
196: }
197: getExecutor().execute(new Runnable() {
198: public void run() {
199: try {
200: processFileAndDelete(aFile);
201: } finally {
202: workingSet.remove(aFile);
203: }
204: }
205: });
206: }
207: }
208:
209: protected void processFileAndDelete(File aFile) {
210: try {
211: if (log.isDebugEnabled()) {
212: log.debug("Processing file " + aFile);
213: }
214: if (aFile.exists()) {
215: processFile(aFile);
216: if (isDeleteFile()) {
217: if (archive != null) {
218: FileUtil.moveFile(aFile, archive);
219: } else {
220: if (!aFile.delete()) {
221: throw new IOException(
222: "Could not delete file " + aFile);
223: }
224: }
225: }
226: }
227: } catch (Exception e) {
228: log.error("Failed to process file: " + aFile + ". Reason: "
229: + e, e);
230: }
231: }
232:
233: protected void processFile(File aFile) throws Exception {
234: InputStream in = null;
235: try {
236: String name = aFile.getCanonicalPath();
237: in = new BufferedInputStream(new FileInputStream(aFile));
238: InOnly exchange = getExchangeFactory()
239: .createInOnlyExchange();
240: NormalizedMessage message = exchange.createMessage();
241: exchange.setInMessage(message);
242: marshaler.readMessage(exchange, message, in, name);
243: getDeliveryChannel().sendSync(exchange);
244: } finally {
245: if (in != null) {
246: in.close();
247: }
248: }
249: }
250: }
|