001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.file;
018:
019: import java.io.BufferedInputStream;
020: import java.io.File;
021: import java.io.FileFilter;
022: import java.io.FileInputStream;
023: import java.io.IOException;
024: import java.io.InputStream;
025:
026: import java.util.concurrent.locks.Lock;
027:
028: import javax.jbi.JBIException;
029: import javax.jbi.management.DeploymentException;
030: import javax.jbi.messaging.ExchangeStatus;
031: import javax.jbi.messaging.InOnly;
032: import javax.jbi.messaging.MessageExchange;
033: import javax.jbi.messaging.NormalizedMessage;
034: import javax.jbi.servicedesc.ServiceEndpoint;
035: import javax.xml.namespace.QName;
036:
037: import org.apache.servicemix.common.DefaultComponent;
038: import org.apache.servicemix.common.ServiceUnit;
039: import org.apache.servicemix.common.endpoints.PollingEndpoint;
040: import org.apache.servicemix.components.util.DefaultFileMarshaler;
041: import org.apache.servicemix.components.util.FileMarshaler;
042: import org.apache.servicemix.jbi.util.FileUtil;
043: import org.apache.servicemix.locks.LockManager;
044: import org.apache.servicemix.locks.impl.SimpleLockManager;
045:
046: /**
047: * A polling endpoint which looks for a file or files in a directory
048: * and sends the files into the JBI bus as messages, deleting the files
049: * by default when they are processed.
050: *
051: * @org.apache.xbean.XBean element="poller"
052: *
053: * @version $Revision: 564081 $
054: */
055: public class FilePollerEndpoint extends PollingEndpoint implements
056: FileEndpointType {
057:
058: private File file;
059: private FileFilter filter;
060: private boolean deleteFile = true;
061: private boolean recursive = true;
062: private boolean autoCreateDirectory = true;
063: private File archive;
064: private FileMarshaler marshaler = new DefaultFileMarshaler();
065: private LockManager lockManager;
066:
067: public FilePollerEndpoint() {
068: }
069:
070: public FilePollerEndpoint(ServiceUnit serviceUnit, QName service,
071: String endpoint) {
072: super (serviceUnit, service, endpoint);
073: }
074:
075: public FilePollerEndpoint(DefaultComponent component,
076: ServiceEndpoint endpoint) {
077: super (component, endpoint);
078: }
079:
080: public void poll() throws Exception {
081: pollFileOrDirectory(file);
082: }
083:
084: public void validate() throws DeploymentException {
085: super .validate();
086: if (file == null) {
087: throw new DeploymentException(
088: "You must specify a file property");
089: }
090: if (isAutoCreateDirectory() && !file.exists()) {
091: file.mkdirs();
092: }
093: if (archive != null) {
094: if (!deleteFile) {
095: throw new DeploymentException(
096: "Archive shouldn't be specified unless deleteFile='true'");
097: }
098: if (isAutoCreateDirectory() && !archive.exists()) {
099: archive.mkdirs();
100: }
101: if (!archive.isDirectory()) {
102: throw new DeploymentException(
103: "Archive should refer to a directory");
104: }
105: }
106: if (lockManager == null) {
107: lockManager = createLockManager();
108: }
109: }
110:
111: protected LockManager createLockManager() {
112: return new SimpleLockManager();
113: }
114:
115: // Properties
116: //-------------------------------------------------------------------------
117: public File getFile() {
118: return file;
119: }
120:
121: /**
122: * Sets the file to poll, which can be a directory or a file.
123: *
124: * @param file
125: */
126: public void setFile(File file) {
127: this .file = file;
128: }
129:
130: /**
131: * @return the lockManager
132: */
133: public LockManager getLockManager() {
134: return lockManager;
135: }
136:
137: /**
138: * @param lockManager the lockManager to set
139: */
140: public void setLockManager(LockManager lockManager) {
141: this .lockManager = lockManager;
142: }
143:
144: public FileFilter getFilter() {
145: return filter;
146: }
147:
148: /**
149: * Sets the optional filter to choose which files to process
150: */
151: public void setFilter(FileFilter filter) {
152: this .filter = filter;
153: }
154:
155: /**
156: * Returns whether or not we should delete the file when its processed
157: */
158: public boolean isDeleteFile() {
159: return deleteFile;
160: }
161:
162: public void setDeleteFile(boolean deleteFile) {
163: this .deleteFile = deleteFile;
164: }
165:
166: public boolean isRecursive() {
167: return recursive;
168: }
169:
170: public void setRecursive(boolean recursive) {
171: this .recursive = recursive;
172: }
173:
174: public boolean isAutoCreateDirectory() {
175: return autoCreateDirectory;
176: }
177:
178: public void setAutoCreateDirectory(boolean autoCreateDirectory) {
179: this .autoCreateDirectory = autoCreateDirectory;
180: }
181:
182: public FileMarshaler getMarshaler() {
183: return marshaler;
184: }
185:
186: public void setMarshaler(FileMarshaler marshaler) {
187: this .marshaler = marshaler;
188: }
189:
190: public File getArchive() {
191: return archive;
192: }
193:
194: /**
195: * Configure a directory to archive files before deleting them.
196: *
197: * @param archive the archive directory
198: */
199: public void setArchive(File archive) {
200: this .archive = archive;
201: }
202:
203: // Implementation methods
204: //-------------------------------------------------------------------------
205:
206: protected void pollFileOrDirectory(File fileOrDirectory) {
207: pollFileOrDirectory(fileOrDirectory, true);
208: }
209:
210: protected void pollFileOrDirectory(File fileOrDirectory,
211: boolean processDir) {
212: if (!fileOrDirectory.isDirectory()) {
213: pollFile(fileOrDirectory); // process the file
214: } else if (processDir) {
215: logger.debug("Polling directory " + fileOrDirectory);
216: File[] files = fileOrDirectory.listFiles(getFilter());
217: for (int i = 0; i < files.length; i++) {
218: pollFileOrDirectory(files[i], isRecursive()); // self-recursion
219: }
220: } else {
221: logger.debug("Skipping directory " + fileOrDirectory);
222: }
223: }
224:
225: protected void pollFile(final File aFile) {
226: if (logger.isDebugEnabled()) {
227: logger
228: .debug("Scheduling file " + aFile
229: + " for processing");
230: }
231: getExecutor().execute(new Runnable() {
232: public void run() {
233: String uri = file.toURI().relativize(aFile.toURI())
234: .toString();
235: Lock lock = lockManager.getLock(uri);
236: if (lock.tryLock()) {
237: boolean unlock = true;
238: try {
239: unlock = processFileAndDelete(aFile);
240: } finally {
241: if (unlock) {
242: lock.unlock();
243: }
244: }
245: } else {
246: if (logger.isDebugEnabled()) {
247: logger.debug("Unable to acquire lock on "
248: + aFile);
249: }
250: }
251: }
252: });
253: }
254:
255: protected boolean processFileAndDelete(File aFile) {
256: boolean unlock = true;
257: try {
258: if (logger.isDebugEnabled()) {
259: logger.debug("Processing file " + aFile);
260: }
261: if (aFile.exists()) {
262: processFile(aFile);
263: unlock = false;
264: if (isDeleteFile()) {
265: if (archive != null) {
266: FileUtil.moveFile(aFile, archive);
267: } else {
268: if (!aFile.delete()) {
269: throw new IOException(
270: "Could not delete file " + aFile);
271: }
272: }
273: unlock = true;
274: }
275: }
276: } catch (Exception e) {
277: logger.error("Failed to process file: " + aFile
278: + ". Reason: " + e, e);
279: }
280: return unlock;
281: }
282:
283: protected void processFile(File aFile) throws Exception {
284: InputStream in = null;
285: try {
286: String name = aFile.getCanonicalPath();
287: in = new BufferedInputStream(new FileInputStream(aFile));
288: InOnly exchange = getExchangeFactory()
289: .createInOnlyExchange();
290: configureExchangeTarget(exchange);
291: NormalizedMessage message = exchange.createMessage();
292: exchange.setInMessage(message);
293: marshaler.readMessage(exchange, message, in, name);
294: sendSync(exchange);
295: if (exchange.getStatus() == ExchangeStatus.ERROR) {
296: Exception e = exchange.getError();
297: if (e == null) {
298: e = new JBIException("Unkown error");
299: }
300: throw e;
301: }
302: } finally {
303: if (in != null) {
304: in.close();
305: }
306: }
307: }
308:
309: public String getLocationURI() {
310: return file.toURI().toString();
311: }
312:
313: public void process(MessageExchange exchange) throws Exception {
314: // Do nothing. In our case, this method should never be called
315: // as we only send synchronous InOnly exchange
316: }
317: }
|