001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.ftp;
018:
019: import java.io.File;
020: import java.io.FileFilter;
021: import java.io.IOException;
022: import java.io.InputStream;
023: import java.net.URI;
024: import java.util.concurrent.locks.Lock;
025:
026: import javax.jbi.JBIException;
027: import javax.jbi.management.DeploymentException;
028: import javax.jbi.messaging.ExchangeStatus;
029: import javax.jbi.messaging.InOnly;
030: import javax.jbi.messaging.MessageExchange;
031: import javax.jbi.messaging.NormalizedMessage;
032: import javax.jbi.servicedesc.ServiceEndpoint;
033: import javax.xml.namespace.QName;
034:
035: import org.apache.commons.net.ftp.FTPClient;
036: import org.apache.commons.net.ftp.FTPFile;
037: import org.apache.servicemix.common.DefaultComponent;
038: import org.apache.servicemix.common.ServiceUnit;
039: import org.apache.servicemix.common.endpoints.PollingEndpoint;
040: import org.apache.servicemix.components.util.DefaultFileMarshaler;
041: import org.apache.servicemix.components.util.FileMarshaler;
042: import org.apache.servicemix.locks.LockManager;
043: import org.apache.servicemix.locks.impl.SimpleLockManager;
044:
045: /**
046: * A polling endpoint which looks for a file or files in a directory
047: * and sends the files into the JBI bus as messages, deleting the files
048: * by default when they are processed.
049: *
050: * @org.apache.xbean.XBean element="poller"
051: *
052: * @version $Revision: 468487 $
053: */
054: public class FtpPollerEndpoint extends PollingEndpoint implements
055: FtpEndpointType {
056:
057: private FTPClientPool clientPool;
058: private FileFilter filter;
059: private boolean deleteFile = true;
060: private boolean recursive = true;
061: private FileMarshaler marshaler = new DefaultFileMarshaler();
062: private LockManager lockManager;
063: private QName targetOperation;
064: private URI uri;
065:
066: public FtpPollerEndpoint() {
067: }
068:
069: public FtpPollerEndpoint(ServiceUnit serviceUnit, QName service,
070: String endpoint) {
071: super (serviceUnit, service, endpoint);
072: }
073:
074: public FtpPollerEndpoint(DefaultComponent component,
075: ServiceEndpoint endpoint) {
076: super (component, endpoint);
077: }
078:
079: public void poll() throws Exception {
080: pollFileOrDirectory(getWorkingPath());
081: }
082:
083: public void validate() throws DeploymentException {
084: super .validate();
085: if (uri == null
086: && (getClientPool() == null || getClientPool()
087: .getHost() == null)) {
088: throw new DeploymentException(
089: "Property uri or clientPool.host must be configured");
090: }
091: if (uri != null && getClientPool() != null
092: && getClientPool().getHost() != null) {
093: throw new DeploymentException(
094: "Properties uri and clientPool.host can not be configured at the same time");
095: }
096: }
097:
098: public void start() throws Exception {
099: if (lockManager == null) {
100: lockManager = createLockManager();
101: }
102: if (clientPool == null) {
103: clientPool = createClientPool();
104: }
105: if (uri != null) {
106: clientPool.setHost(uri.getHost());
107: clientPool.setPort(uri.getPort());
108: if (uri.getUserInfo() != null) {
109: String[] infos = uri.getUserInfo().split(":");
110: clientPool.setUsername(infos[0]);
111: if (infos.length > 1) {
112: clientPool.setPassword(infos[1]);
113: }
114: }
115: } else {
116: String str = "ftp://" + clientPool.getHost();
117: if (clientPool.getPort() >= 0) {
118: str += ":" + clientPool.getPort();
119: }
120: str += "/";
121: uri = new URI(str);
122: }
123: super .start();
124: }
125:
126: protected LockManager createLockManager() {
127: return new SimpleLockManager();
128: }
129:
130: private String getWorkingPath() {
131: return (uri != null && uri.getPath() != null) ? uri.getPath()
132: : ".";
133: }
134:
135: // Properties
136: //-------------------------------------------------------------------------
137: /**
138: * @return the clientPool
139: */
140: public FTPClientPool getClientPool() {
141: return clientPool;
142: }
143:
144: /**
145: * @param clientPool the clientPool to set
146: */
147: public void setClientPool(FTPClientPool clientPool) {
148: this .clientPool = clientPool;
149: }
150:
151: /**
152: * @return the uri
153: */
154: public URI getUri() {
155: return uri;
156: }
157:
158: /**
159: * @param uri the uri to set
160: */
161: public void setUri(URI uri) {
162: this .uri = uri;
163: }
164:
165: public FileFilter getFilter() {
166: return filter;
167: }
168:
169: /**
170: * Sets the optional filter to choose which files to process
171: */
172: public void setFilter(FileFilter filter) {
173: this .filter = filter;
174: }
175:
176: /**
177: * Returns whether or not we should delete the file when its processed
178: */
179: public boolean isDeleteFile() {
180: return deleteFile;
181: }
182:
183: public void setDeleteFile(boolean deleteFile) {
184: this .deleteFile = deleteFile;
185: }
186:
187: public boolean isRecursive() {
188: return recursive;
189: }
190:
191: public void setRecursive(boolean recursive) {
192: this .recursive = recursive;
193: }
194:
195: public FileMarshaler getMarshaler() {
196: return marshaler;
197: }
198:
199: public void setMarshaler(FileMarshaler marshaler) {
200: this .marshaler = marshaler;
201: }
202:
203: public QName getTargetOperation() {
204: return targetOperation;
205: }
206:
207: public void setTargetOperation(QName targetOperation) {
208: this .targetOperation = targetOperation;
209: }
210:
211: // Implementation methods
212: //-------------------------------------------------------------------------
213:
214: protected void pollFileOrDirectory(String fileOrDirectory)
215: throws Exception {
216: FTPClient ftp = borrowClient();
217: try {
218: logger.debug("Polling directory " + fileOrDirectory);
219: pollFileOrDirectory(ftp, fileOrDirectory, isRecursive());
220: } finally {
221: returnClient(ftp);
222: }
223: }
224:
225: protected void pollFileOrDirectory(FTPClient ftp,
226: String fileOrDirectory, boolean processDir)
227: throws Exception {
228: FTPFile[] files = ftp.listFiles(fileOrDirectory);
229: for (int i = 0; i < files.length; i++) {
230: String name = files[i].getName();
231: if (".".equals(name) || "..".equals(name)) {
232: continue; // ignore "." and ".."
233: }
234: String file = fileOrDirectory + "/" + name;
235: // This is a file, process it
236: if (!files[i].isDirectory()) {
237: if (getFilter() == null
238: || getFilter().accept(new File(file))) {
239: pollFile(file); // process the file
240: }
241: // Only process directories if processDir is true
242: } else if (processDir) {
243: if (logger.isDebugEnabled()) {
244: logger.debug("Polling directory " + file);
245: }
246: pollFileOrDirectory(ftp, file, isRecursive());
247: } else {
248: if (logger.isDebugEnabled()) {
249: logger.debug("Skipping directory " + file);
250: }
251: }
252: }
253: }
254:
255: protected void pollFile(final String file) {
256: if (logger.isDebugEnabled()) {
257: logger.debug("Scheduling file " + file + " for processing");
258: }
259: getExecutor().execute(new Runnable() {
260: public void run() {
261: final Lock lock = lockManager.getLock(file);
262: if (lock.tryLock()) {
263: boolean unlock = true;
264: try {
265: unlock = processFileAndDelete(file);
266: } finally {
267: if (unlock) {
268: lock.unlock();
269: }
270: }
271: }
272: }
273: });
274: }
275:
276: protected boolean processFileAndDelete(String file) {
277: FTPClient ftp = null;
278: boolean unlock = true;
279: try {
280: ftp = borrowClient();
281: if (logger.isDebugEnabled()) {
282: logger.debug("Processing file " + file);
283: }
284: if (ftp.listFiles(file).length > 0) {
285: // Process the file. If processing fails, an exception should be thrown.
286: processFile(ftp, file);
287: // Processing is successful
288: // We should not unlock until the file has been deleted
289: unlock = false;
290: if (isDeleteFile()) {
291: if (!ftp.deleteFile(file)) {
292: throw new IOException("Could not delete file "
293: + file);
294: }
295: unlock = true;
296: }
297: } else {
298: //avoid processing files that have been deleted on the server
299: logger.debug("Skipping " + file
300: + ": the file no longer exists on the server");
301: }
302: } catch (Exception e) {
303: logger.error("Failed to process file: " + file
304: + ". Reason: " + e, e);
305: } finally {
306: returnClient(ftp);
307: }
308: return unlock;
309: }
310:
311: protected void processFile(FTPClient ftp, String file)
312: throws Exception {
313: InputStream in = ftp.retrieveFileStream(file);
314: InOnly exchange = getExchangeFactory().createInOnlyExchange();
315: configureExchangeTarget(exchange);
316: NormalizedMessage message = exchange.createMessage();
317: exchange.setInMessage(message);
318:
319: if (getTargetOperation() != null) {
320: exchange.setOperation(getTargetOperation());
321: }
322:
323: marshaler.readMessage(exchange, message, in, file);
324: sendSync(exchange);
325: in.close();
326: ftp.completePendingCommand();
327: if (exchange.getStatus() == ExchangeStatus.ERROR) {
328: Exception e = exchange.getError();
329: if (e == null) {
330: e = new JBIException("Unkown error");
331: }
332: throw e;
333: }
334: }
335:
336: public String getLocationURI() {
337: return uri.toString();
338: }
339:
340: public void process(MessageExchange exchange) throws Exception {
341: // Do nothing. In our case, this method should never be called
342: // as we only send synchronous InOnly exchange
343: }
344:
345: protected FTPClientPool createClientPool() throws Exception {
346: FTPClientPool pool = new FTPClientPool();
347: pool.afterPropertiesSet();
348: return pool;
349: }
350:
351: protected FTPClient borrowClient() throws JBIException {
352: try {
353: return (FTPClient) getClientPool().borrowClient();
354: } catch (Exception e) {
355: throw new JBIException(e);
356: }
357: }
358:
359: protected void returnClient(FTPClient client) {
360: if (client != null) {
361: try {
362: getClientPool().returnClient(client);
363: } catch (Exception e) {
364: logger
365: .error("Failed to return client to pool: " + e,
366: e);
367: }
368: }
369: }
370:
371: }
|