001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.truezip;
018:
019: import java.io.BufferedInputStream;
020: import java.io.FileFilter;
021: import java.io.IOException;
022: import java.io.InputStream;
023: import java.util.concurrent.locks.Lock;
024:
025: import javax.jbi.management.DeploymentException;
026: import javax.jbi.messaging.InOnly;
027: import javax.jbi.messaging.MessageExchange;
028: import javax.jbi.messaging.NormalizedMessage;
029: import javax.jbi.servicedesc.ServiceEndpoint;
030: import javax.xml.namespace.QName;
031:
032: import de.schlichtherle.io.File;
033: import de.schlichtherle.io.FileInputStream;
034:
035: import org.apache.servicemix.common.DefaultComponent;
036: import org.apache.servicemix.common.ServiceUnit;
037: import org.apache.servicemix.common.endpoints.PollingEndpoint;
038: import org.apache.servicemix.components.util.DefaultFileMarshaler;
039: import org.apache.servicemix.components.util.FileMarshaler;
040: import org.apache.servicemix.locks.LockManager;
041: import org.apache.servicemix.locks.impl.SimpleLockManager;
042:
043: /**
044: * A polling endpoint which looks for a file or files in a directory and sends
045: * the files into the JBI bus as messages, deleting the files by default when
046: * they are processed.
047: *
048: * @org.apache.xbean.XBean element="poller"
049: *
050: * @version $Revision: 557978 $
051: */
052: public class TrueZipPollerEndpoint extends PollingEndpoint implements
053: TrueZipEndpointType {
054:
055: private File file;
056:
057: private FileFilter filter;
058:
059: private boolean deleteFile = true;
060:
061: private boolean recursive = true;
062:
063: private boolean autoCreateDirectory = true;
064:
065: private FileMarshaler marshaler = new DefaultFileMarshaler();
066:
067: private LockManager lockManager;
068:
069: public TrueZipPollerEndpoint() {
070: }
071:
072: public TrueZipPollerEndpoint(ServiceUnit serviceUnit,
073: QName service, String endpoint) {
074: super (serviceUnit, service, endpoint);
075: }
076:
077: public TrueZipPollerEndpoint(DefaultComponent component,
078: ServiceEndpoint endpoint) {
079: super (component, endpoint);
080: }
081:
082: public void poll() throws Exception {
083: pollFileOrDirectory(file);
084: }
085:
086: public void validate() throws DeploymentException {
087: super .validate();
088: if (file == null) {
089: throw new DeploymentException(
090: "You must specify a file property");
091: }
092: if (isAutoCreateDirectory() && !file.exists()) {
093: file.mkdirs();
094: }
095: if (lockManager == null) {
096: lockManager = createLockManager();
097: }
098: }
099:
100: protected LockManager createLockManager() {
101: return new SimpleLockManager();
102: }
103:
104: // Properties
105: // -------------------------------------------------------------------------
106: public java.io.File getFile() {
107: return file;
108: }
109:
110: /**
111: * Sets the file to poll, which can be a directory or a file.
112: *
113: * @param file
114: */
115: public void setFile(java.io.File file) {
116: this .file = new File(file);
117: }
118:
119: /**
120: * @return the lockManager
121: */
122: public LockManager getLockManager() {
123: return lockManager;
124: }
125:
126: /**
127: * @param lockManager
128: * the lockManager to set
129: */
130: public void setLockManager(LockManager lockManager) {
131: this .lockManager = lockManager;
132: }
133:
134: public FileFilter getFilter() {
135: return filter;
136: }
137:
138: /**
139: * Sets the optional filter to choose which files to process
140: */
141: public void setFilter(FileFilter filter) {
142: this .filter = filter;
143: }
144:
145: /**
146: * Returns whether or not we should delete the file when its processed
147: */
148: public boolean isDeleteFile() {
149: return deleteFile;
150: }
151:
152: public void setDeleteFile(boolean deleteFile) {
153: this .deleteFile = deleteFile;
154: }
155:
156: public boolean isRecursive() {
157: return recursive;
158: }
159:
160: public void setRecursive(boolean recursive) {
161: this .recursive = recursive;
162: }
163:
164: public boolean isAutoCreateDirectory() {
165: return autoCreateDirectory;
166: }
167:
168: public void setAutoCreateDirectory(boolean autoCreateDirectory) {
169: this .autoCreateDirectory = autoCreateDirectory;
170: }
171:
172: public FileMarshaler getMarshaler() {
173: return marshaler;
174: }
175:
176: public void setMarshaler(FileMarshaler marshaler) {
177: this .marshaler = marshaler;
178: }
179:
180: // Implementation methods
181: // -------------------------------------------------------------------------
182:
183: protected void pollFileOrDirectory(File fileOrDirectory) {
184: pollFileOrDirectory(fileOrDirectory, true);
185: }
186:
187: protected void pollFileOrDirectory(File fileOrDirectory,
188: boolean processDir) {
189: if (!fileOrDirectory.isDirectory()) {
190: pollFile(fileOrDirectory); // process the file
191: } else if (processDir) {
192: logger.debug("Polling directory " + fileOrDirectory);
193: File[] files = (File[]) fileOrDirectory
194: .listFiles(getFilter());
195: for (int i = 0; i < files.length; i++) {
196: pollFileOrDirectory(files[i], isRecursive()); // self-recursion
197: }
198: } else {
199: logger.debug("Skipping directory " + fileOrDirectory);
200: }
201: }
202:
203: protected void pollFile(final File aFile) {
204: if (logger.isDebugEnabled()) {
205: logger
206: .debug("Scheduling file " + aFile
207: + " for processing");
208: }
209: getExecutor().execute(new Runnable() {
210: public void run() {
211: String uri = file.toURI().relativize(aFile.toURI())
212: .toString();
213: Lock lock = lockManager.getLock(uri);
214: if (lock.tryLock()) {
215: try {
216: processFileAndDelete(aFile);
217: } finally {
218: lock.unlock();
219: }
220: } else {
221: if (logger.isDebugEnabled()) {
222: logger.debug("Unable to acquire lock on "
223: + aFile);
224: }
225: }
226: }
227: });
228: }
229:
230: protected void processFileAndDelete(File aFile) {
231: try {
232: if (logger.isDebugEnabled()) {
233: logger.debug("Processing file " + aFile);
234: }
235: if (aFile.exists()) {
236: processFile(aFile);
237: if (isDeleteFile() && !aFile.delete()) {
238: throw new IOException("Could not delete file "
239: + aFile);
240: }
241: }
242: } catch (Exception e) {
243: logger.error("Failed to process file: " + aFile
244: + ". Reason: " + e, e);
245: }
246: }
247:
248: protected void processFile(File aFile) throws Exception {
249: String name = aFile.getCanonicalPath();
250: InputStream in = new BufferedInputStream(new FileInputStream(
251: aFile));
252: InOnly exchange = getExchangeFactory().createInOnlyExchange();
253: configureExchangeTarget(exchange);
254: NormalizedMessage message = exchange.createMessage();
255: exchange.setInMessage(message);
256: marshaler.readMessage(exchange, message, in, name);
257: sendSync(exchange);
258: in.close();
259: }
260:
261: public String getLocationURI() {
262: return file.toURI().toString();
263: }
264:
265: public void process(MessageExchange exchange) throws Exception {
266: // Do nothing. In our case, this method should never be called
267: // as we only send synchronous InOnly exchange
268: }
269: }
|