001: /*
002: * $Id: FtpConnector.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.ftp;
012:
013: import org.mule.api.MuleException;
014: import org.mule.api.MuleMessage;
015: import org.mule.api.MuleRuntimeException;
016: import org.mule.api.endpoint.EndpointURI;
017: import org.mule.api.endpoint.ImmutableEndpoint;
018: import org.mule.api.endpoint.InboundEndpoint;
019: import org.mule.api.lifecycle.InitialisationException;
020: import org.mule.api.service.Service;
021: import org.mule.api.transport.ConnectorException;
022: import org.mule.api.transport.DispatchException;
023: import org.mule.api.transport.MessageReceiver;
024: import org.mule.config.i18n.CoreMessages;
025: import org.mule.config.i18n.MessageFactory;
026: import org.mule.model.streaming.CallbackOutputStream;
027: import org.mule.transport.AbstractConnector;
028: import org.mule.transport.file.FilenameParser;
029: import org.mule.transport.file.SimpleFilenameParser;
030: import org.mule.util.ClassUtils;
031:
032: import java.io.IOException;
033: import java.io.OutputStream;
034: import java.text.MessageFormat;
035: import java.util.HashMap;
036: import java.util.Iterator;
037: import java.util.Map;
038:
039: import org.apache.commons.net.ftp.FTPClient;
040: import org.apache.commons.pool.ObjectPool;
041: import org.apache.commons.pool.impl.GenericObjectPool;
042:
043: public class FtpConnector extends AbstractConnector {
044:
045: public static final String FTP = "ftp";
046:
047: // endpoint properties
048: public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency"; // inbound only
049: public static final int DEFAULT_POLLING_FREQUENCY = 1000;
050: public static final String PROPERTY_OUTPUT_PATTERN = "outputPattern"; // outbound only
051: public static final String PROPERTY_PASSIVE_MODE = "passive";
052: public static final String PROPERTY_BINARY_TRANSFER = "binary";
053:
054: // message properties
055: public static final String PROPERTY_FILENAME = "filename";
056:
057: /**
058: * TODO it makes sense to have a type-safe adapter for FTP specifically, but without
059: * Java 5's covariant return types the benefits are diminished. Keeping it simple for now.
060: */
061: public static final String DEFAULT_FTP_CONNECTION_FACTORY_CLASS = "org.mule.transport.ftp.FtpConnectionFactory";
062:
063: /**
064: * Time in milliseconds to poll. On each poll the poll() method is called
065: */
066: private long pollingFrequency;
067:
068: private String outputPattern;
069:
070: private FilenameParser filenameParser = new SimpleFilenameParser();
071:
072: private boolean passive = true;
073:
074: private boolean binary = true;
075:
076: /**
077: * Whether to test FTP connection on each take from pool.
078: */
079: private boolean validateConnections = true;
080:
081: private Map pools = new HashMap();
082:
083: private String connectionFactoryClass = DEFAULT_FTP_CONNECTION_FACTORY_CLASS;
084:
085: public String getProtocol() {
086: return FTP;
087: }
088:
089: public MessageReceiver createReceiver(Service service,
090: InboundEndpoint endpoint) throws Exception {
091: long polling = pollingFrequency;
092: Map props = endpoint.getProperties();
093: if (props != null) {
094: // Override properties on the endpoint for the specific endpoint
095: String tempPolling = (String) props
096: .get(PROPERTY_POLLING_FREQUENCY);
097: if (tempPolling != null) {
098: polling = Long.parseLong(tempPolling);
099: }
100: }
101: if (polling <= 0) {
102: polling = DEFAULT_POLLING_FREQUENCY;
103: }
104: logger.debug("set polling frequency to " + polling);
105: return serviceDescriptor.createMessageReceiver(this , service,
106: endpoint, new Object[] { new Long(polling) });
107: }
108:
109: /**
110: * @return Returns the pollingFrequency.
111: */
112: public long getPollingFrequency() {
113: return pollingFrequency;
114: }
115:
116: /**
117: * @param pollingFrequency The pollingFrequency to set.
118: */
119: public void setPollingFrequency(long pollingFrequency) {
120: this .pollingFrequency = pollingFrequency;
121: }
122:
123: /**
124: * Getter for property 'connectionFactoryClass'.
125: *
126: * @return Value for property 'connectionFactoryClass'.
127: */
128: public String getConnectionFactoryClass() {
129: return connectionFactoryClass;
130: }
131:
132: /**
133: * Setter for property 'connectionFactoryClass'. Should be an instance of
134: * {@link FtpConnectionFactory}.
135: *
136: * @param connectionFactoryClass Value to set for property 'connectionFactoryClass'.
137: */
138: public void setConnectionFactoryClass(
139: final String connectionFactoryClass) {
140: this .connectionFactoryClass = connectionFactoryClass;
141: }
142:
143: public FTPClient getFtp(EndpointURI uri) throws Exception {
144: if (logger.isDebugEnabled()) {
145: logger.debug(">>> retrieving client for " + uri);
146: }
147: return (FTPClient) getFtpPool(uri).borrowObject();
148: }
149:
150: public void releaseFtp(EndpointURI uri, FTPClient client)
151: throws Exception {
152: if (logger.isDebugEnabled()) {
153: logger.debug("<<< releasing client for " + uri);
154: }
155: if (dispatcherFactory.isCreateDispatcherPerRequest()) {
156: destroyFtp(uri, client);
157: } else {
158: getFtpPool(uri).returnObject(client);
159: }
160: }
161:
162: public void destroyFtp(EndpointURI uri, FTPClient client)
163: throws Exception {
164: if (logger.isDebugEnabled()) {
165: logger.debug("<<< destroying client for " + uri);
166: }
167: try {
168: getFtpPool(uri).invalidateObject(client);
169: } catch (Exception e) {
170: // no way to test if pool is closed except try to access it
171: logger.debug(e.getMessage());
172: }
173: }
174:
175: protected synchronized ObjectPool getFtpPool(EndpointURI uri) {
176: if (logger.isDebugEnabled()) {
177: logger.debug("=== get pool for " + uri);
178: }
179: String key = uri.getUser() + ":" + uri.getPassword() + "@"
180: + uri.getHost() + ":" + uri.getPort();
181: ObjectPool pool = (ObjectPool) pools.get(key);
182: if (pool == null) {
183: try {
184: FtpConnectionFactory connectionFactory = (FtpConnectionFactory) ClassUtils
185: .instanciateClass(getConnectionFactoryClass(),
186: new Object[] { uri }, getClass());
187: pool = new GenericObjectPool(connectionFactory);
188: ((GenericObjectPool) pool)
189: .setTestOnBorrow(this .validateConnections);
190: pools.put(key, pool);
191: } catch (Exception ex) {
192: throw new MuleRuntimeException(
193: MessageFactory
194: .createStaticMessage("Hmm, couldn't instanciate FTP connection factory."),
195: ex);
196: }
197: }
198: return pool;
199: }
200:
201: protected void doInitialise() throws InitialisationException {
202: try {
203: Class objectFactoryClass = ClassUtils.loadClass(
204: this .connectionFactoryClass, getClass());
205: if (!FtpConnectionFactory.class
206: .isAssignableFrom(objectFactoryClass)) {
207: throw new InitialisationException(
208: MessageFactory
209: .createStaticMessage("FTP connectionFactoryClass is not an instance of org.mule.transport.ftp.FtpConnectionFactory"),
210: this );
211: }
212: } catch (ClassNotFoundException e) {
213: throw new InitialisationException(e, this );
214: }
215: }
216:
217: protected void doDispose() {
218: // template method
219: }
220:
221: protected void doConnect() throws Exception {
222: // template method
223: }
224:
225: protected void doDisconnect() throws Exception {
226: // template method
227: }
228:
229: protected void doStart() throws MuleException {
230: // template method
231: }
232:
233: protected void doStop() throws MuleException {
234: if (logger.isDebugEnabled()) {
235: logger.debug("!!! stopping all pools");
236: }
237: try {
238: for (Iterator it = pools.values().iterator(); it.hasNext();) {
239: ObjectPool pool = (ObjectPool) it.next();
240: pool.close();
241: }
242: } catch (Exception e) {
243: throw new ConnectorException(CoreMessages
244: .failedToStop("FTP Connector"), this , e);
245: }
246: }
247:
248: /**
249: * @return Returns the outputPattern.
250: */
251: public String getOutputPattern() {
252: return outputPattern;
253: }
254:
255: /**
256: * @param outputPattern The outputPattern to set.
257: */
258: public void setOutputPattern(String outputPattern) {
259: this .outputPattern = outputPattern;
260: }
261:
262: /**
263: * @return Returns the filenameParser.
264: */
265: public FilenameParser getFilenameParser() {
266: return filenameParser;
267: }
268:
269: /**
270: * @param filenameParser The filenameParser to set.
271: */
272: public void setFilenameParser(FilenameParser filenameParser) {
273: this .filenameParser = filenameParser;
274: }
275:
276: /**
277: * Getter for FTP passive mode.
278: *
279: * @return true if using FTP passive mode
280: */
281: public boolean isPassive() {
282: return passive;
283: }
284:
285: /**
286: * Setter for FTP passive mode.
287: *
288: * @param passive passive mode flag
289: */
290: public void setPassive(final boolean passive) {
291: this .passive = passive;
292: }
293:
294: /**
295: * Passive mode is OFF by default. The value is taken from the connector
296: * settings. In case there are any overriding properties set on the endpoint,
297: * those will be used.
298: *
299: * @see #setPassive(boolean)
300: */
301: public void enterActiveOrPassiveMode(FTPClient client,
302: ImmutableEndpoint endpoint) {
303: // well, no endpoint URI here, as we have to use the most common denominator
304: // in API :(
305: final String passiveString = (String) endpoint
306: .getProperty(FtpConnector.PROPERTY_PASSIVE_MODE);
307: if (passiveString == null) {
308: // try the connector properties then
309: if (isPassive()) {
310: if (logger.isTraceEnabled()) {
311: logger.trace("Entering FTP passive mode");
312: }
313: client.enterLocalPassiveMode();
314: } else {
315: if (logger.isTraceEnabled()) {
316: logger.trace("Entering FTP active mode");
317: }
318: client.enterLocalActiveMode();
319: }
320: } else {
321: // override with endpoint's definition
322: final boolean passiveMode = Boolean.valueOf(passiveString)
323: .booleanValue();
324: if (passiveMode) {
325: if (logger.isTraceEnabled()) {
326: logger
327: .trace("Entering FTP passive mode (endpoint override)");
328: }
329: client.enterLocalPassiveMode();
330: } else {
331: if (logger.isTraceEnabled()) {
332: logger
333: .trace("Entering FTP active mode (endpoint override)");
334: }
335: client.enterLocalActiveMode();
336: }
337: }
338: }
339:
340: /**
341: * Whether to test FTP connection on each take from pool.
342: */
343: public boolean isValidateConnections() {
344: return validateConnections;
345: }
346:
347: /**
348: * Whether to test FTP connection on each take from pool. This takes care of a
349: * failed (or restarted) FTP server at the expense of an additional NOOP command
350: * packet being sent, but increases overall availability. <p/> Disable to gain
351: * slight performance gain or if you are absolutely sure of the FTP server
352: * availability. <p/> The default value is <code>true</code>
353: */
354: public void setValidateConnections(final boolean validateConnections) {
355: this .validateConnections = validateConnections;
356: }
357:
358: /**
359: * Getter for FTP transfer type.
360: *
361: * @return true if using FTP binary type
362: */
363: public boolean isBinary() {
364: return binary;
365: }
366:
367: /**
368: * Setter for FTP transfer type.
369: *
370: * @param binary binary type flag
371: */
372: public void setBinary(final boolean binary) {
373: this .binary = binary;
374: }
375:
376: /**
377: * Transfer type is BINARY by default. The value is taken from the connector
378: * settings. In case there are any overriding properties set on the endpoint,
379: * those will be used. <p/> The alternative type is ASCII. <p/>
380: *
381: * @see #setBinary(boolean)
382: */
383: public void setupFileType(FTPClient client,
384: ImmutableEndpoint endpoint) throws Exception {
385: int type;
386:
387: // well, no endpoint URI here, as we have to use the most common denominator
388: // in API :(
389: final String binaryTransferString = (String) endpoint
390: .getProperty(FtpConnector.PROPERTY_BINARY_TRANSFER);
391: if (binaryTransferString == null) {
392: // try the connector properties then
393: if (isBinary()) {
394: if (logger.isTraceEnabled()) {
395: logger.trace("Using FTP BINARY type");
396: }
397: type = org.apache.commons.net.ftp.FTP.BINARY_FILE_TYPE;
398: } else {
399: if (logger.isTraceEnabled()) {
400: logger.trace("Using FTP ASCII type");
401: }
402: type = org.apache.commons.net.ftp.FTP.ASCII_FILE_TYPE;
403: }
404: } else {
405: // override with endpoint's definition
406: final boolean binaryTransfer = Boolean.valueOf(
407: binaryTransferString).booleanValue();
408: if (binaryTransfer) {
409: if (logger.isTraceEnabled()) {
410: logger
411: .trace("Using FTP BINARY type (endpoint override)");
412: }
413: type = org.apache.commons.net.ftp.FTP.BINARY_FILE_TYPE;
414: } else {
415: if (logger.isTraceEnabled()) {
416: logger
417: .trace("Using FTP ASCII type (endpoint override)");
418: }
419: type = org.apache.commons.net.ftp.FTP.ASCII_FILE_TYPE;
420: }
421: }
422:
423: client.setFileType(type);
424: }
425:
426: /**
427: * Well get the output stream (if any) for this type of transport. Typically this
428: * will be called only when Streaming is being used on an outbound endpoint
429: *
430: * @param endpoint the endpoint that releates to this Dispatcher
431: * @param message the current message being processed
432: * @return the output stream to use for this request or null if the transport
433: * does not support streaming
434: * @throws org.mule.api.MuleException
435: */
436: public OutputStream getOutputStream(ImmutableEndpoint endpoint,
437: MuleMessage message) throws MuleException {
438: try {
439: final EndpointURI uri = endpoint.getEndpointURI();
440: String filename = getFilename(endpoint, message);
441:
442: final FTPClient client = this .createFtpClient(endpoint);
443: try {
444: OutputStream out = client.storeFileStream(filename);
445: return new CallbackOutputStream(out,
446: new CallbackOutputStream.Callback() {
447: public void onClose() throws Exception {
448: try {
449: if (!client
450: .completePendingCommand()) {
451: client.logout();
452: client.disconnect();
453: throw new IOException(
454: "FTP Stream failed to complete pending request");
455: }
456: } finally {
457: releaseFtp(uri, client);
458: }
459: }
460: });
461: } catch (Exception e) {
462: logger.debug("Error getting output stream: ", e);
463: releaseFtp(uri, client);
464: throw e;
465: }
466: } catch (Exception e) {
467: throw new DispatchException(CoreMessages
468: .streamingFailedNoStream(), message, endpoint, e);
469: }
470: }
471:
472: private String getFilename(ImmutableEndpoint endpoint,
473: MuleMessage message) throws IOException {
474: String filename = (String) message
475: .getProperty(FtpConnector.PROPERTY_FILENAME);
476: if (filename == null) {
477: String outPattern = (String) endpoint
478: .getProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN);
479: if (outPattern == null) {
480: outPattern = message.getStringProperty(
481: FtpConnector.PROPERTY_OUTPUT_PATTERN,
482: getOutputPattern());
483: }
484: filename = generateFilename(message, outPattern);
485: }
486: if (filename == null) {
487: throw new IOException("Filename is null");
488: }
489: return filename;
490: }
491:
492: private String generateFilename(MuleMessage message, String pattern) {
493: if (pattern == null) {
494: pattern = getOutputPattern();
495: }
496: return getFilenameParser().getFilename(message, pattern);
497: }
498:
499: /**
500: * Creates a new FTPClient that logs in and changes the working directory using the data
501: * provided in <code>endpoint</code>.
502: */
503: protected FTPClient createFtpClient(ImmutableEndpoint endpoint)
504: throws Exception {
505: EndpointURI uri = endpoint.getEndpointURI();
506: FTPClient client = this .getFtp(uri);
507:
508: this .enterActiveOrPassiveMode(client, endpoint);
509: this .setupFileType(client, endpoint);
510:
511: String path = uri.getPath();
512: // MULE-2400: if the path begins with '~' we must strip the first '/' to make things
513: // work with FTPClient
514: if ((path.length() >= 2) && (path.charAt(1) == '~')) {
515: path = path.substring(1);
516: }
517:
518: if (!client.changeWorkingDirectory(path)) {
519: throw new IOException(
520: MessageFormat
521: .format(
522: "Failed to change working directory to {0}. Ftp error: {1}",
523: new Object[] {
524: path,
525: new Integer(client
526: .getReplyCode()) }));
527: }
528: return client;
529: }
530: }
|