001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.synapse.transport.vfs;
020:
021: import org.apache.synapse.transport.base.BaseConstants;
022: import org.apache.synapse.transport.base.BaseUtils;
023: import org.apache.synapse.transport.base.AbstractPollingTransportListener;
024: import org.apache.axis2.addressing.EndpointReference;
025: import org.apache.axis2.AxisFault;
026: import org.apache.axis2.Constants;
027: import org.apache.axis2.description.*;
028: import org.apache.axis2.context.ConfigurationContext;
029: import org.apache.axis2.context.MessageContext;
030: import org.apache.commons.vfs.*;
031: import org.apache.commons.vfs.impl.StandardFileSystemManager;
032: import org.apache.commons.logging.LogFactory;
033:
034: import javax.xml.namespace.QName;
035: import java.util.*;
036: import java.io.File;
037: import java.text.DateFormat;
038: import java.text.SimpleDateFormat;
039:
040: /**
041: * The "vfs" transport is a polling based transport - i.e. it gets kicked off at
042: * specified periodic durations, and would iterate through a list of directories or files
043: * specified according to poll durations. When scanning a directory, it will match
044: * its contents against a given regex to find the set of input files. For compressed
045: * files, the contents could be matched against a regex to find individual files.
046: * Each of these files thus found would be submitted as an Axis2 "message" into the
047: * Axis2 engine.
048: *
049: * The processed files would be deleted or renamed as specified in the configuration
050: *
051: * Supported VFS example URIs
052: *
053: * file:///directory/filename.ext
054: * file:////somehost/someshare/afile.txt
055: * jar:../lib/classes.jar!/META-INF/manifest.mf
056: * zip:http://somehost/downloads/somefile.zip
057: * jar:zip:outer.zip!/nested.jar!/somedir
058: * jar:zip:outer.zip!/nested.jar!/some%21dir
059: * tar:gz:http://anyhost/dir/mytar.tar.gz!/mytar.tar!/path/in/tar/README.txt
060: * tgz:file://anyhost/dir/mytar.tgz!/somepath/somefile
061: * gz:/my/gz/file.gz
062: * http://somehost:8080/downloads/somefile.jar
063: * http://myusername@somehost/index.html
064: * webdav://somehost:8080/dist
065: * ftp://myusername:mypassword@somehost/pub/downloads/somefile.tgz[?passive=true]
066: * sftp://myusername:mypassword@somehost/pub/downloads/somefile.tgz
067: * smb://somehost/home
068: *
069: * axis2.xml - transport definition
070: * <transportReceiver name="file" class="org.apache.synapse.transport.vfs.VFSTransportListener"/>
071: *
072: * services.xml - service attachment
073: * required parameters
074: * <parameter name="transport.vfs.FileURI">..</parameter>
075: * <parameter name="transport.vfs.ContentType">..</parameter>
076: *
077: * optional parameters
078: * <parameter name="transport.vfs.FileNamePattern">..</parameter>
079: * <parameter name="transport.PollInterval">..</parameter>
080: *
081: * <parameter name="transport.vfs.ActionAfterProcess">..</parameter>
082: * <parameter name="transport.vfs.ActionAfterErrors" >..</parameter>
083: * <parameter name="transport.vfs.ActionAfterFailure">..</parameter>
084: *
085: * <parameter name="transport.vfs.ReplyFileURI" >..</parameter>
086: * <parameter name="transport.vfs.ReplyFileName">..</parameter>
087: *
088: * FTP testing URIs
089: * ftp://ftpuser:password@asankha/somefile.csv?passive=true
090: * ftp://vfs:apache@vfs.netfirms.com/somepath/somefile.xml?passive=true
091: */
092: public class VFSTransportListener extends
093: AbstractPollingTransportListener {
094:
095: public static final String TRANSPORT_NAME = "vfs";
096:
097: public static final String DELETE = "DELETE";
098: public static final String MOVE = "MOVE";
099:
100: /** Keep the list of directories/files and poll durations */
101: private final List pollTable = new ArrayList();
102: /** The VFS file system manager */
103: private FileSystemManager fsManager = null;
104:
105: /**
106: * Initializes the VFS transport by getting the VFS File System manager
107: * @param cfgCtx the Axsi2 configuration context
108: * @param trpInDesc the VFS transport in description from the axis2.xml
109: * @throws AxisFault on error
110: */
111: public void init(ConfigurationContext cfgCtx,
112: TransportInDescription trpInDesc) throws AxisFault {
113: setTransportName(TRANSPORT_NAME);
114: super .init(cfgCtx, trpInDesc);
115: try {
116: StandardFileSystemManager fsm = new StandardFileSystemManager();
117: fsm.setConfiguration(getClass().getClassLoader()
118: .getResource("providers.xml"));
119: fsm.init();
120: fsManager = fsm;
121: } catch (FileSystemException e) {
122: handleException("Error initializing the file transport : "
123: + e.getMessage(), e);
124: }
125: }
126:
127: /**
128: * On a poller tick, iterate over the list of directories/files and check if
129: * it is time to scan the contents for new files
130: */
131: public void onPoll() {
132: Iterator iter = pollTable.iterator();
133: while (iter.hasNext()) {
134: PollTableEntry entry = (PollTableEntry) iter.next();
135: long startTime = System.currentTimeMillis();
136:
137: if (startTime > entry.getNextPollTime()) {
138: scanFileOrDirectory(entry, entry.getFileURI());
139: }
140: }
141: }
142:
143: /**
144: * Search for files that match the given regex pattern and create a list
145: * Then process each of these files and update the status of the scan on
146: * the poll table
147: * @param entry the poll table entry for the scan
148: * @param fileURI the file or directory to be scanned
149: */
150: private void scanFileOrDirectory(final PollTableEntry entry,
151: String fileURI) {
152:
153: FileObject fileObject = null;
154:
155: if (log.isDebugEnabled()) {
156: log.debug("Scanning directory or file : " + fileURI);
157: }
158:
159: boolean wasError = true;
160: int retryCount = 0;
161: int maxRetryCount = entry.getMaxRetryCount();
162: long reconnectionTimeout = entry.getReconnectTimeout();
163:
164: while (wasError == true) {
165: try {
166: retryCount++;
167: fileObject = fsManager.resolveFile(fileURI);
168:
169: if (fileObject == null) {
170: log.error("fileObject is null");
171: throw new FileSystemException("fileObject is null");
172: }
173:
174: wasError = false;
175:
176: } catch (FileSystemException e) {
177: log.error("cannot resolve fileObject", e);
178: if (maxRetryCount <= retryCount)
179: processFailure(
180: "cannot resolve fileObject repeatedly: "
181: + e.getMessage(), e, entry);
182: return;
183: }
184:
185: if (wasError == true) {
186: try {
187: Thread.sleep(reconnectionTimeout);
188: } catch (InterruptedException e2) {
189: e2.printStackTrace();
190: }
191: }
192: }
193:
194: try {
195: if (fileObject.exists() && fileObject.isReadable()) {
196:
197: entry.setLastPollState(PollTableEntry.NONE);
198: FileObject[] children = null;
199: try {
200: children = fileObject.getChildren();
201: } catch (FileSystemException ignore) {
202: }
203:
204: // if this is a file that would translate to a single message
205: if (children == null || children.length == 0) {
206:
207: if (fileObject.getType() == FileType.FILE) {
208: try {
209: processFile(entry, fileObject);
210: entry
211: .setLastPollState(PollTableEntry.SUCCSESSFUL);
212:
213: } catch (AxisFault e) {
214: entry
215: .setLastPollState(PollTableEntry.FAILED);
216: }
217:
218: moveOrDeleteAfterProcessing(entry, fileObject);
219: }
220:
221: } else {
222: int failCount = 0;
223: int successCount = 0;
224:
225: if (log.isDebugEnabled()) {
226: log.debug("File name pattern :"
227: + entry.getFileNamePattern());
228: }
229: for (int i = 0; i < children.length; i++) {
230: if (log.isDebugEnabled()) {
231: log.debug("Matching file :"
232: + children[i].getName()
233: .getBaseName());
234: }
235: if ((entry.getFileNamePattern() != null)
236: && (children[i].getName().getBaseName()
237: .matches(entry
238: .getFileNamePattern()))) {
239: try {
240: if (log.isDebugEnabled()) {
241: log.debug("Processing file :"
242: + children[i]);
243: }
244: processFile(entry, children[i]);
245: successCount++;
246: // tell moveOrDeleteAfterProcessing() file was success
247: entry
248: .setLastPollState(PollTableEntry.SUCCSESSFUL);
249:
250: } catch (Exception e) {
251: logException(
252: "Error processing File URI : "
253: + children[i].getName(),
254: e);
255: failCount++;
256: // tell moveOrDeleteAfterProcessing() file failed
257: entry
258: .setLastPollState(PollTableEntry.FAILED);
259: }
260:
261: moveOrDeleteAfterProcessing(entry,
262: children[i]);
263: }
264: }
265:
266: if (failCount == 0 && successCount > 0) {
267: entry
268: .setLastPollState(PollTableEntry.SUCCSESSFUL);
269: } else if (successCount == 0 && failCount > 0) {
270: entry.setLastPollState(PollTableEntry.FAILED);
271: } else {
272: entry
273: .setLastPollState(PollTableEntry.WITH_ERRORS);
274: }
275: }
276:
277: // processing of this poll table entry is complete
278: long now = System.currentTimeMillis();
279: entry.setLastPollTime(now);
280: entry.setNextPollTime(now + entry.getPollInterval());
281:
282: } else {
283: if (log.isDebugEnabled()) {
284: log
285: .debug("Unable to access or read file or directory : "
286: + fileURI);
287: }
288: }
289:
290: } catch (FileSystemException e) {
291: processFailure(
292: "Error checking for existence and readability : "
293: + fileURI, e, entry);
294: }
295:
296: }
297:
298: /**
299: * Take specified action to either move or delete the processed file, depending on the outcome
300: * @param entry the PollTableEntry for the file that has been processed
301: * @param fileObject the FileObject representing the file to be moved or deleted
302: */
303: private void moveOrDeleteAfterProcessing(
304: final PollTableEntry entry, FileObject fileObject) {
305:
306: String moveToDirectory = null;
307: try {
308: switch (entry.getLastPollState()) {
309: case PollTableEntry.SUCCSESSFUL:
310: if (entry.getActionAfterProcess() == PollTableEntry.MOVE) {
311: moveToDirectory = entry.getMoveAfterProcess();
312: }
313: break;
314:
315: case PollTableEntry.WITH_ERRORS:
316: if (entry.getActionAfterProcess() == PollTableEntry.MOVE) {
317: moveToDirectory = entry.getMoveAfterErrors();
318: }
319: break;
320:
321: case PollTableEntry.FAILED:
322: if (entry.getActionAfterProcess() == PollTableEntry.MOVE) {
323: moveToDirectory = entry.getMoveAfterFailure();
324: }
325: break;
326: case PollTableEntry.NONE:
327: return;
328: }
329:
330: if (moveToDirectory != null) {
331: String prefix = "";
332: if (entry.getMoveTimestampFormat() != null) {
333: Date now = new Date();
334: prefix = entry.getMoveTimestampFormat().format(now);
335: }
336: String destName = moveToDirectory + File.separator
337: + prefix + fileObject.getName().getBaseName();
338: if (log.isDebugEnabled()) {
339: log.debug("Moving to file :" + destName);
340: }
341: FileObject dest = fsManager.resolveFile(destName);
342: try {
343: fileObject.moveTo(dest);
344: } catch (FileSystemException e) {
345: log.error("Error moving file : " + fileObject
346: + " to " + moveToDirectory, e);
347: }
348: } else {
349: try {
350: if (log.isDebugEnabled()) {
351: log.debug("Deleting file :" + fileObject);
352: }
353: fileObject.close();
354: if (!fileObject.delete()) {
355: log.error("Cannot delete file : " + fileObject);
356: }
357: } catch (FileSystemException e) {
358: log.error("Error deleting file : " + fileObject, e);
359: }
360: }
361:
362: } catch (FileSystemException e) {
363: log.error(
364: "Error resolving directory to move after processing : "
365: + moveToDirectory, e);
366: }
367: }
368:
369: /**
370: * Process a single file through Axis2
371: * @param entry the PollTableEntry for the file (or its parent directory or archive)
372: * @param file the file that contains the actual message pumped into Axis2
373: * @throws AxisFault on error
374: */
375: private void processFile(PollTableEntry entry, FileObject file)
376: throws AxisFault {
377:
378: try {
379: FileContent content = file.getContent();
380: String fileName = file.getName().getBaseName();
381: String filePath = file.getName().getPath();
382:
383: Map transportHeaders = new HashMap();
384: transportHeaders.put(VFSConstants.FILE_PATH, filePath);
385: transportHeaders.put(VFSConstants.FILE_NAME, fileName);
386:
387: try {
388: transportHeaders.put(VFSConstants.FILE_LENGTH, Long
389: .valueOf(content.getSize()));
390: } catch (FileSystemException ignore) {
391: }
392: try {
393: transportHeaders.put(VFSConstants.LAST_MODIFIED, Long
394: .valueOf(content.getLastModifiedTime()));
395: } catch (FileSystemException ignore) {
396: }
397:
398: // compute the unique message ID
399: String messageId = filePath + "_" + fileName + "_"
400: + System.currentTimeMillis() + "_"
401: + (int) Math.random() * 1000;
402:
403: String contentType = entry.getContentType();
404: if (!BaseUtils.isValid(contentType)) {
405: if (file.getName().getExtension().toLowerCase()
406: .endsWith(".xml")) {
407: contentType = "text/xml";
408: } else if (file.getName().getExtension().toLowerCase()
409: .endsWith(".txt")) {
410: contentType = "text/plain";
411: }
412: }
413:
414: // if the content type was not found, but the service defined it.. use it
415: if (contentType == null) {
416: if (entry.getContentType() != null) {
417: contentType = entry.getContentType();
418: } else if (VFSUtils.getInstace().getProperty(content,
419: BaseConstants.CONTENT_TYPE) != null) {
420: contentType = VFSUtils.getInstace().getProperty(
421: content, BaseConstants.CONTENT_TYPE);
422: }
423: }
424:
425: MessageContext msgContext = createMessageContext();
426: // set to bypass dispatching if we know the service - we already should!
427: AxisService service = cfgCtx.getAxisConfiguration()
428: .getService(entry.getServiceName());
429: msgContext.setAxisService(service);
430:
431: // find the operation for the message, or default to one
432: Parameter operationParam = service
433: .getParameter(BaseConstants.OPERATION_PARAM);
434: QName operationQName = (operationParam != null ? BaseUtils
435: .getQNameFromString(operationParam.getValue())
436: : BaseConstants.DEFAULT_OPERATION);
437:
438: AxisOperation operation = service
439: .getOperation(operationQName);
440: if (operation != null) {
441: msgContext.setAxisOperation(operation);
442: }
443:
444: // does the service specify a default reply file URI ?
445: Parameter param = service
446: .getParameter(VFSConstants.REPLY_FILE_URI);
447: if (param != null && param.getValue() != null) {
448: msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
449: new VFSOutTransportInfo((String) param
450: .getValue()));
451: }
452:
453: // set the message payload to the message context
454: VFSUtils.getInstace().setSOAPEnvelope(content, msgContext,
455: contentType);
456:
457: handleIncomingMessage(msgContext, transportHeaders, null, //* SOAP Action - not applicable *//
458: contentType);
459:
460: if (log.isDebugEnabled()) {
461: log.debug("Processed file : " + file
462: + " of Content-type : " + contentType);
463: }
464:
465: } catch (FileSystemException e) {
466: handleException(
467: "Error reading file content or attributes : "
468: + file, e);
469:
470: } finally {
471: try {
472: file.close();
473: } catch (FileSystemException warn) {
474: log.warn("Cannot close file after processing : "
475: + file.getName().getPath(), warn);
476: }
477: }
478: }
479:
480: /**
481: * method to log a failure to the log file and to update the last poll status and time
482: * @param msg text for the log message
483: * @param e optiona exception encountered or null
484: * @param entry the PollTableEntry
485: */
486: private void processFailure(String msg, Exception e,
487: PollTableEntry entry) {
488: if (e == null) {
489: log.error(msg);
490: } else {
491: log.error(msg, e);
492: }
493: long now = System.currentTimeMillis();
494: entry.setLastPollState(PollTableEntry.FAILED);
495: entry.setLastPollTime(now);
496: entry.setNextPollTime(now + entry.getPollInterval());
497: }
498:
499: /**
500: * Get the EPR for the given service over the VFS transport
501: * vfs:uri (@see http://jakarta.apache.org/commons/vfs/filesystems.html for the URI formats)
502: * @param serviceName service name
503: * @param ip ignored
504: * @return the EPR for the service
505: * @throws AxisFault not used
506: */
507: public EndpointReference[] getEPRsForService(String serviceName,
508: String ip) throws AxisFault {
509: Iterator iter = pollTable.iterator();
510: while (iter.hasNext()) {
511: PollTableEntry entry = (PollTableEntry) iter.next();
512: if (entry.getServiceName().equals(serviceName)) {
513: return new EndpointReference[] { new EndpointReference(
514: "vfs:" + entry.getFileURI()) };
515: }
516: }
517: return null;
518: }
519:
520: protected void startListeningForService(AxisService service) {
521:
522: Parameter param = service
523: .getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
524: long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
525: if (param != null && param.getValue() instanceof String) {
526: try {
527: pollInterval = Integer.parseInt(param.getValue()
528: .toString());
529: } catch (NumberFormatException e) {
530: log.error("Invalid poll interval : " + param.getValue()
531: + " for service : " + service.getName()
532: + " default to : "
533: + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000)
534: + "sec", e);
535: }
536: }
537:
538: PollTableEntry entry = new PollTableEntry();
539: try {
540: entry.setFileURI(BaseUtils.getRequiredServiceParam(service,
541: VFSConstants.TRANSPORT_FILE_FILE_URI));
542: entry.setFileNamePattern(BaseUtils.getOptionalServiceParam(
543: service,
544: VFSConstants.TRANSPORT_FILE_FILE_NAME_PATTERN));
545: entry.setContentType(BaseUtils.getRequiredServiceParam(
546: service, VFSConstants.TRANSPORT_FILE_CONTENT_TYPE));
547: String option = BaseUtils.getOptionalServiceParam(service,
548: VFSConstants.TRANSPORT_FILE_ACTION_AFTER_PROCESS);
549: entry
550: .setActionAfterProcess(MOVE.equals(option) ? PollTableEntry.MOVE
551: : PollTableEntry.DELETE);
552: option = BaseUtils.getOptionalServiceParam(service,
553: VFSConstants.TRANSPORT_FILE_ACTION_AFTER_ERRORS);
554: entry
555: .setActionAfterErrors(MOVE.equals(option) ? PollTableEntry.MOVE
556: : PollTableEntry.DELETE);
557: option = BaseUtils.getOptionalServiceParam(service,
558: VFSConstants.TRANSPORT_FILE_ACTION_AFTER_FAILURE);
559: entry
560: .setActionAfterFailure(MOVE.equals(option) ? PollTableEntry.MOVE
561: : PollTableEntry.DELETE);
562:
563: String moveDirectoryAfterProcess = BaseUtils
564: .getOptionalServiceParam(
565: service,
566: VFSConstants.TRANSPORT_FILE_MOVE_AFTER_PROCESS);
567: entry.setMoveAfterProcess(moveDirectoryAfterProcess);
568: String moveDirectoryAfterErrors = BaseUtils
569: .getOptionalServiceParam(
570: service,
571: VFSConstants.TRANSPORT_FILE_MOVE_AFTER_ERRORS);
572: entry.setMoveAfterErrors(moveDirectoryAfterErrors);
573: String moveDirectoryAfterFailure = BaseUtils
574: .getOptionalServiceParam(
575: service,
576: VFSConstants.TRANSPORT_FILE_MOVE_AFTER_FAILURE);
577: entry.setMoveAfterFailure(moveDirectoryAfterFailure);
578:
579: String moveFileTimestampFormat = BaseUtils
580: .getOptionalServiceParam(
581: service,
582: VFSConstants.TRANSPORT_FILE_MOVE_TIMESTAMP_FORMAT);
583: if (moveFileTimestampFormat != null) {
584: DateFormat moveTimestampFormat = new SimpleDateFormat(
585: moveFileTimestampFormat);
586: entry.setMoveTimestampFormat(moveTimestampFormat);
587: }
588:
589: String strMaxRetryCount = BaseUtils
590: .getOptionalServiceParam(service,
591: VFSConstants.MAX_RETRY_COUNT);
592: if (strMaxRetryCount != null)
593: entry.setMaxRetryCount(Integer
594: .parseInt(strMaxRetryCount));
595:
596: String strReconnectTimeout = BaseUtils
597: .getOptionalServiceParam(service,
598: VFSConstants.RECONNECT_TIMEOUT);
599: if (strReconnectTimeout != null)
600: entry.setReconnectTimeout(Integer
601: .parseInt(strReconnectTimeout) * 1000);
602:
603: entry.setServiceName(service.getName());
604: schedulePoll(service, pollInterval);
605: pollTable.add(entry);
606:
607: } catch (AxisFault axisFault) {
608: String msg = "Error configuring the File/VFS transport for Service : "
609: + service.getName()
610: + " :: "
611: + axisFault.getMessage();
612: log.warn(msg);
613: //cfgCtx.getAxisConfiguration().getFaultyServices().put(service.getName(), msg);
614: }
615: }
616:
617: protected void stopListeningForService(AxisService service) {
618: Iterator iter = pollTable.iterator();
619: while (iter.hasNext()) {
620: PollTableEntry entry = (PollTableEntry) iter.next();
621: if (service.getName().equals(entry.getServiceName())) {
622: cancelPoll(service);
623: pollTable.remove(entry);
624: }
625: }
626: }
627: }
|