001: /*
002: * $Id: FileConnector.java 11139 2008-02-29 20:36:46Z acooke $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.file;
012:
013: import org.mule.api.MessagingException;
014: import org.mule.api.MuleException;
015: import org.mule.api.MuleMessage;
016: import org.mule.api.config.MuleProperties;
017: import org.mule.api.endpoint.ImmutableEndpoint;
018: import org.mule.api.endpoint.InboundEndpoint;
019: import org.mule.api.lifecycle.InitialisationException;
020: import org.mule.api.service.Service;
021: import org.mule.api.transport.DispatchException;
022: import org.mule.api.transport.MessageAdapter;
023: import org.mule.api.transport.MessageReceiver;
024: import org.mule.config.i18n.CoreMessages;
025: import org.mule.transformer.simple.ByteArrayToSerializable;
026: import org.mule.transformer.simple.SerializableToByteArray;
027: import org.mule.transport.AbstractConnector;
028: import org.mule.transport.file.filters.FilenameWildcardFilter;
029: import org.mule.util.FileUtils;
030: import org.mule.util.MapUtils;
031:
032: import java.io.File;
033: import java.io.FileOutputStream;
034: import java.io.IOException;
035: import java.io.OutputStream;
036: import java.util.Map;
037: import java.util.Properties;
038:
039: import org.apache.commons.logging.Log;
040: import org.apache.commons.logging.LogFactory;
041:
042: /**
043: * <code>FileConnector</code> is used for setting up listeners on a directory and
044: * for writing files to a directory. The connecotry provides support for defining
045: * file output patterns and filters for receiving files.
046: */
047:
048: public class FileConnector extends AbstractConnector {
049:
050: public static final String FILE = "file";
051: private static Log logger = LogFactory.getLog(FileConnector.class);
052:
053: // These are properties that can be overridden on the Receiver by the endpoint declaration
054: // inbound only
055: public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency";
056: public static final String PROPERTY_FILE_AGE = "fileAge";
057: public static final String PROPERTY_MOVE_TO_PATTERN = "moveToPattern";
058: public static final String PROPERTY_MOVE_TO_DIRECTORY = "moveToDirectory";
059: public static final String PROPERTY_READ_FROM_DIRECTORY = "readFromDirectoryName";
060: // outbound only
061: public static final String PROPERTY_OUTPUT_PATTERN = "outputPattern";
062: // apparently unused (once strange override code deleted)
063: // public static final String PROPERTY_DELETE_ON_READ = "autoDelete";
064: // public static final String PROPERTY_SERVICE_OVERRIDE = "serviceOverrides";
065:
066: // message properties
067: public static final String PROPERTY_FILENAME = "filename";
068: public static final String PROPERTY_ORIGINAL_FILENAME = "originalFilename";
069: public static final String PROPERTY_DIRECTORY = "directory";
070: public static final String PROPERTY_WRITE_TO_DIRECTORY = "writeToDirectoryName";
071: public static final String PROPERTY_FILE_SIZE = "fileSize";
072:
073: public static final long DEFAULT_POLLING_FREQUENCY = 1000;
074:
075: /**
076: * Time in milliseconds to poll. On each poll the poll() method is called
077: */
078: private long pollingFrequency = 0;
079:
080: private String moveToPattern = null;
081:
082: private String writeToDirectoryName = null;
083:
084: private String moveToDirectoryName = null;
085:
086: private String readFromDirectoryName = null;
087:
088: private String outputPattern = null;
089:
090: private boolean outputAppend = false;
091:
092: private boolean autoDelete = true;
093:
094: private boolean checkFileAge = false;
095:
096: private long fileAge = 0;
097:
098: private FileOutputStream outputStream = null;
099:
100: private boolean serialiseObjects = false;
101:
102: private boolean streaming = true;
103:
104: public FilenameParser filenameParser;
105:
106: /*
107: * (non-Javadoc)
108: *
109: * @see org.mule.transport.AbstractConnector#doInitialise()
110: */
111: public FileConnector() {
112: super ();
113: // MULE-1773: limit the number of dispatchers per endpoint to 1 until
114: // there is a proper (Distributed)LockManager in place (MULE-2402).
115: // We also override the setter to prevent "wrong" configuration for now.
116: super .setMaxDispatchersActive(1);
117: filenameParser = new SimpleFilenameParser();
118: }
119:
120: // @Override
121: public void setMaxDispatchersActive(int value) {
122: if (value != 1) {
123: throw new IllegalArgumentException(
124: "MULE-1773: cannot configure maxDispatchersActive");
125: }
126: }
127:
128: // @Override
129: protected Object getReceiverKey(Service service,
130: InboundEndpoint endpoint) {
131: if (endpoint.getFilter() != null) {
132: return endpoint.getEndpointURI().getAddress()
133: + "/"
134: + ((FilenameWildcardFilter) endpoint.getFilter())
135: .getPattern();
136: }
137: return endpoint.getEndpointURI().getAddress();
138: }
139:
140: /**
141: * Registers a listener for a particular directory The following properties can
142: * be overriden in the endpoint declaration
143: * <ul>
144: * <li>moveToDirectory</li>
145: * <li>filterPatterns</li>
146: * <li>filterClass</li>
147: * <li>pollingFrequency</li>
148: * </ul>
149: */
150: public MessageReceiver createReceiver(Service service,
151: InboundEndpoint endpoint) throws Exception {
152: String readDir = endpoint.getEndpointURI().getAddress();
153: if (null != getReadFromDirectory()) {
154: readDir = getReadFromDirectory();
155: }
156:
157: long polling = this .pollingFrequency;
158:
159: String moveTo = moveToDirectoryName;
160: String moveToPattern = getMoveToPattern();
161:
162: Map props = endpoint.getProperties();
163: if (props != null) {
164: // Override properties on the endpoint for the specific endpoint
165: String read = (String) props
166: .get(PROPERTY_READ_FROM_DIRECTORY);
167: if (read != null) {
168: readDir = read;
169: }
170: String move = (String) props
171: .get(PROPERTY_MOVE_TO_DIRECTORY);
172: if (move != null) {
173: moveTo = move;
174: }
175: String tempMoveToPattern = (String) props
176: .get(PROPERTY_MOVE_TO_PATTERN);
177: if (tempMoveToPattern != null) {
178: if (logger.isDebugEnabled()) {
179: logger.debug("set moveTo Pattern to: "
180: + tempMoveToPattern);
181: }
182: moveToPattern = tempMoveToPattern;
183: }
184:
185: String tempPolling = (String) props
186: .get(PROPERTY_POLLING_FREQUENCY);
187: if (tempPolling != null) {
188: polling = Long.parseLong(tempPolling);
189: }
190:
191: if (polling <= 0) {
192: polling = DEFAULT_POLLING_FREQUENCY;
193: }
194:
195: if (logger.isDebugEnabled()) {
196: logger.debug("set polling frequency to: " + polling);
197: }
198: String tempFileAge = (String) props.get(PROPERTY_FILE_AGE);
199: if (tempFileAge != null) {
200: try {
201: setFileAge(Long.parseLong(tempFileAge));
202: } catch (Exception ex1) {
203: logger.error("Failed to set fileAge", ex1);
204: }
205: }
206:
207: // this is surreal! what on earth was it useful for?
208: // Map srvOverride = (Map) props.get(PROPERTY_SERVICE_OVERRIDE);
209: // if (srvOverride != null)
210: // {
211: // if (serviceOverrides == null)
212: // {
213: // serviceOverrides = new Properties();
214: // }
215: // serviceOverrides.setProperty(MuleProperties.CONNECTOR_INBOUND_TRANSFORMER,
216: // NoActionTransformer.class.getName());
217: // serviceOverrides.setProperty(MuleProperties.CONNECTOR_OUTBOUND_TRANSFORMER,
218: // NoActionTransformer.class.getName());
219: // }
220:
221: }
222:
223: try {
224: return serviceDescriptor.createMessageReceiver(this ,
225: service, endpoint, new Object[] { readDir, moveTo,
226: moveToPattern, new Long(polling) });
227:
228: } catch (Exception e) {
229: throw new InitialisationException(CoreMessages
230: .failedToCreateObjectWith("Message Receiver",
231: serviceDescriptor), e, this );
232: }
233: }
234:
235: public String getProtocol() {
236: return FILE;
237: }
238:
239: public FilenameParser getFilenameParser() {
240: return filenameParser;
241: }
242:
243: public void setFilenameParser(FilenameParser filenameParser) {
244: this .filenameParser = filenameParser;
245: }
246:
247: protected void doDispose() {
248: try {
249: doStop();
250: } catch (MuleException e) {
251: logger.error(e.getMessage(), e);
252: }
253: }
254:
255: protected void doInitialise() throws InitialisationException {
256: // template method, nothing to do
257: }
258:
259: protected void doConnect() throws Exception {
260: // template method, nothing to do
261: }
262:
263: protected void doDisconnect() throws Exception {
264: // template method, nothing to do
265: }
266:
267: protected void doStart() throws MuleException {
268: // template method, nothing to do
269: }
270:
271: protected void doStop() throws MuleException {
272: if (outputStream != null) {
273: try {
274: outputStream.close();
275: } catch (IOException e) {
276: logger
277: .warn("Failed to close file output stream on stop: "
278: + e);
279: }
280: }
281: }
282:
283: /**
284: * @return Returns the moveToDirectoryName.
285: */
286: public String getMoveToDirectory() {
287: return moveToDirectoryName;
288: }
289:
290: /**
291: * @param dir The moveToDirectoryName to set.
292: */
293: public void setMoveToDirectory(String dir) {
294: this .moveToDirectoryName = dir;
295: }
296:
297: /**
298: * @return Returns the outputAppend.
299: */
300: public boolean isOutputAppend() {
301: return outputAppend;
302: }
303:
304: /**
305: * @param outputAppend The outputAppend to set.
306: */
307: public void setOutputAppend(boolean outputAppend) {
308: this .outputAppend = outputAppend;
309: }
310:
311: /**
312: * @return Returns the outputPattern.
313: */
314: public String getOutputPattern() {
315: return outputPattern;
316: }
317:
318: /**
319: * @param outputPattern The outputPattern to set.
320: */
321: public void setOutputPattern(String outputPattern) {
322: this .outputPattern = outputPattern;
323: }
324:
325: /**
326: * @return Returns the outputStream.
327: */
328: public FileOutputStream getOutputStream() {
329: return outputStream;
330: }
331:
332: /**
333: * @param outputStream The outputStream to set.
334: */
335: public void setOutputStream(FileOutputStream outputStream) {
336: this .outputStream = outputStream;
337: }
338:
339: /**
340: * @return Returns the pollingFrequency.
341: */
342: public long getPollingFrequency() {
343: return pollingFrequency;
344: }
345:
346: /**
347: * @param pollingFrequency The pollingFrequency to set.
348: */
349: public void setPollingFrequency(long pollingFrequency) {
350: this .pollingFrequency = pollingFrequency;
351: }
352:
353: /**
354: * @return Returns the fileAge.
355: */
356: public long getFileAge() {
357: return fileAge;
358: }
359:
360: public boolean getCheckFileAge() {
361: return checkFileAge;
362: }
363:
364: /**
365: * @param fileAge The fileAge in milliseconds to set.
366: */
367: public void setFileAge(long fileAge) {
368: this .fileAge = fileAge;
369: this .checkFileAge = true;
370: }
371:
372: /**
373: * @return Returns the writeToDirectory.
374: */
375: public String getWriteToDirectory() {
376: return writeToDirectoryName;
377: }
378:
379: /**
380: * @param dir The writeToDirectory to set.
381: */
382: public void setWriteToDirectory(String dir) throws IOException {
383: this .writeToDirectoryName = dir;
384: if (writeToDirectoryName != null) {
385: File writeToDirectory = FileUtils
386: .openDirectory(writeToDirectoryName);
387: if (!(writeToDirectory.canRead())
388: || !writeToDirectory.canWrite()) {
389: throw new IOException(
390: "Error on initialization, Write To directory does not exist or is not read/write");
391: }
392: }
393: }
394:
395: /**
396: * @return Returns the readFromDirectory.
397: */
398: public String getReadFromDirectory() {
399: return readFromDirectoryName;
400: }
401:
402: /**
403: * @param dir The readFromDirectory to set.
404: */
405: public void setReadFromDirectory(String dir) throws IOException {
406: this .readFromDirectoryName = dir;
407: if (readFromDirectoryName != null) {
408: File readFromDirectory = FileUtils
409: .openDirectory((readFromDirectoryName));
410: if (!readFromDirectory.canRead()) {
411: throw new IOException(
412: "Error on initialization, read from directory does not exist or is not readable");
413: }
414: }
415: }
416:
417: public boolean isSerialiseObjects() {
418: return serialiseObjects;
419: }
420:
421: public void setSerialiseObjects(boolean serialiseObjects) {
422: // set serialisable transformers on the connector if this is set
423: if (serialiseObjects) {
424: if (serviceOverrides == null) {
425: serviceOverrides = new Properties();
426: }
427: serviceOverrides.setProperty(
428: MuleProperties.CONNECTOR_INBOUND_TRANSFORMER,
429: ByteArrayToSerializable.class.getName());
430: serviceOverrides.setProperty(
431: MuleProperties.CONNECTOR_OUTBOUND_TRANSFORMER,
432: SerializableToByteArray.class.getName());
433: }
434:
435: this .serialiseObjects = serialiseObjects;
436: }
437:
438: public boolean isAutoDelete() {
439: return autoDelete;
440: }
441:
442: public void setAutoDelete(boolean autoDelete) {
443: this .autoDelete = autoDelete;
444: }
445:
446: public String getMoveToPattern() {
447: return moveToPattern;
448: }
449:
450: public void setMoveToPattern(String moveToPattern) {
451: this .moveToPattern = moveToPattern;
452: }
453:
454: /**
455: * Well get the output stream (if any) for this type of transport. Typically this
456: * will be called only when Streaming is being used on an outbound endpoint
457: *
458: * @param endpoint the endpoint that releates to this Dispatcher
459: * @param message the current message being processed
460: * @return the output stream to use for this request or null if the transport
461: * does not support streaming
462: * @throws org.mule.api.MuleException
463: */
464: public OutputStream getOutputStream(ImmutableEndpoint endpoint,
465: MuleMessage message) throws MuleException {
466: String address = endpoint.getEndpointURI().getAddress();
467: String writeToDirectory = message.getStringProperty(
468: FileConnector.PROPERTY_WRITE_TO_DIRECTORY, null);
469: if (writeToDirectory == null) {
470: writeToDirectory = getWriteToDirectory();
471: }
472: if (writeToDirectory != null) {
473: address = getFilenameParser().getFilename(message,
474: writeToDirectory);
475: }
476:
477: String filename;
478: String outPattern = (String) endpoint
479: .getProperty(FileConnector.PROPERTY_OUTPUT_PATTERN);
480: if (outPattern == null) {
481: outPattern = message.getStringProperty(
482: FileConnector.PROPERTY_OUTPUT_PATTERN, null);
483: }
484: if (outPattern == null) {
485: outPattern = getOutputPattern();
486: }
487: try {
488: if (outPattern != null) {
489: filename = generateFilename(message, outPattern);
490: } else {
491: filename = message.getStringProperty(
492: FileConnector.PROPERTY_FILENAME, null);
493: if (filename == null) {
494: filename = generateFilename(message, null);
495: }
496: }
497:
498: if (filename == null) {
499: throw new IOException("Filename is null");
500: }
501: File file = FileUtils.createFile(address + "/" + filename);
502: if (logger.isInfoEnabled()) {
503: logger.info("Writing file to: "
504: + file.getAbsolutePath());
505: }
506:
507: return new FileOutputStream(file, MapUtils.getBooleanValue(
508: endpoint.getProperties(), "outputAppend",
509: isOutputAppend()));
510: } catch (IOException e) {
511: throw new DispatchException(CoreMessages
512: .streamingFailedNoStream(), message, endpoint, e);
513: }
514: }
515:
516: private String generateFilename(MuleMessage message, String pattern) {
517: if (pattern == null) {
518: pattern = getOutputPattern();
519: }
520: return getFilenameParser().getFilename(message, pattern);
521: }
522:
523: public boolean isStreaming() {
524: return streaming;
525: }
526:
527: public void setStreaming(boolean streaming) {
528: this .streaming = streaming;
529: }
530:
531: public MessageAdapter getMessageAdapter(Object message)
532: throws MessagingException {
533: if (isStreaming()) {
534: // TODO Shouldn't we have a way to specify MessageAdaptor for streaming
535: // in service descriptor
536: return new FileMessageAdapter(message);
537: } else {
538: return super.getMessageAdapter(message);
539: }
540: }
541:
542: }
|