001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.ftp;
018:
019: import java.io.IOException;
020: import java.io.OutputStream;
021: import java.net.URI;
022:
023: import javax.jbi.JBIException;
024: import javax.jbi.management.DeploymentException;
025: import javax.jbi.messaging.MessageExchange;
026: import javax.jbi.messaging.NormalizedMessage;
027: import javax.jbi.servicedesc.ServiceEndpoint;
028:
029: import org.apache.commons.net.ftp.FTPClient;
030: import org.apache.servicemix.common.endpoints.ProviderEndpoint;
031: import org.apache.servicemix.components.util.DefaultFileMarshaler;
032: import org.apache.servicemix.components.util.FileMarshaler;
033:
034: /**
035: * An FTP endpoint
036: *
037: * @version $Revision: $
038: * @org.apache.xbean.XBean element="sender"
039: */
040: public class FtpSenderEndpoint extends ProviderEndpoint implements
041: FtpEndpointType {
042:
043: private FTPClientPool clientPool;
044: private FileMarshaler marshaler = new DefaultFileMarshaler();
045: private String uniqueFileName = "ServiceMix";
046: private boolean overwrite;
047: private URI uri;
048: private String uploadSuffix;
049: private boolean checkDuplicates = true;
050:
051: public FtpSenderEndpoint() {
052: }
053:
054: public FtpSenderEndpoint(FtpComponent component,
055: ServiceEndpoint endpoint) {
056: super (component, endpoint);
057: }
058:
059: public void validate() throws DeploymentException {
060: super .validate();
061: if (uri == null
062: && (getClientPool() == null || getClientPool()
063: .getHost() == null)) {
064: throw new DeploymentException(
065: "Property uri or clientPool.host must be configured");
066: }
067: if (uri != null && getClientPool() != null
068: && getClientPool().getHost() != null) {
069: throw new DeploymentException(
070: "Properties uri and clientPool.host can not be configured at the same time");
071: }
072: }
073:
074: /**
075: * Configures the endpoint from a URI
076: */
077: public void setUri(URI uri) {
078: this .uri = uri;
079: }
080:
081: public boolean isCheckDuplicates() {
082: return checkDuplicates;
083: }
084:
085: public void setCheckDuplicates(boolean checkDuplicates) {
086: this .checkDuplicates = checkDuplicates;
087: }
088:
089: public void start() throws Exception {
090: super .start();
091: if (clientPool == null) {
092: clientPool = createClientPool();
093: }
094: if (uri != null) {
095: clientPool.setHost(uri.getHost());
096: clientPool.setPort(uri.getPort());
097: if (uri.getUserInfo() != null) {
098: String[] infos = uri.getUserInfo().split(":");
099: clientPool.setUsername(infos[0]);
100: if (infos.length > 1) {
101: clientPool.setPassword(infos[1]);
102: }
103: }
104: }
105: }
106:
107: // Properties
108: //-------------------------------------------------------------------------
109: public FTPClientPool getClientPool() {
110: return clientPool;
111: }
112:
113: public void setClientPool(FTPClientPool clientPool) {
114: this .clientPool = clientPool;
115: }
116:
117: public FileMarshaler getMarshaler() {
118: return marshaler;
119: }
120:
121: public void setMarshaler(FileMarshaler marshaler) {
122: this .marshaler = marshaler;
123: }
124:
125: public String getUniqueFileName() {
126: return uniqueFileName;
127: }
128:
129: /**
130: * Sets the name used to make a unique name if no file name is available on the message.
131: *
132: * @param uniqueFileName the new value of the unique name to use for generating unique names
133: */
134: public void setUniqueFileName(String uniqueFileName) {
135: this .uniqueFileName = uniqueFileName;
136: }
137:
138: public boolean isOverwrite() {
139: return overwrite;
140: }
141:
142: public void setOverwrite(boolean overwrite) {
143: this .overwrite = overwrite;
144: }
145:
146: public String getUploadSuffix() {
147: return uploadSuffix;
148: }
149:
150: /**
151: * Set the file name suffix used during upload. The suffix will be automatically removed as soon as the upload has completed.
152: * This allows other processes to discern completed files from files that are being uploaded.
153: *
154: * @param uploadSuffix
155: */
156: public void setUploadSuffix(String uploadSuffix) {
157: this .uploadSuffix = uploadSuffix;
158: }
159:
160: // Implementation methods
161: //-------------------------------------------------------------------------
162:
163: protected void processInOnly(MessageExchange exchange,
164: NormalizedMessage message) throws Exception {
165: FTPClient client = null;
166: OutputStream out = null;
167: String name = null;
168: String uploadName = null;
169: try {
170: client = borrowClient();
171: // Change to the directory specified by the URI path if any
172: if (uri != null && uri.getPath() != null) {
173: client.changeWorkingDirectory(uri.getPath());
174: }
175:
176: name = marshaler.getOutputName(exchange, message);
177: if (name == null) {
178: if (uniqueFileName != null) {
179: out = client.storeUniqueFileStream(uniqueFileName);
180: } else {
181: out = client.storeUniqueFileStream();
182: }
183: } else {
184: if (checkDuplicates
185: && client.listFiles(name).length > 0) {
186: if (overwrite) {
187: client.deleteFile(name);
188: } else {
189: throw new IOException(
190: "Can not send "
191: + name
192: + " : file already exists and overwrite has not been enabled");
193: }
194: }
195: uploadName = uploadSuffix == null ? name : name
196: + uploadSuffix;
197: out = client.storeFileStream(uploadName);
198: }
199: if (out == null) {
200: throw new IOException(
201: "No output stream available for output name: "
202: + uploadName
203: + ". Maybe the file already exists?");
204: }
205: marshaler.writeMessage(exchange, message, out, uploadName);
206: } finally {
207: if (out != null) {
208: try {
209: out.close();
210: client.completePendingCommand();
211: if (name != null && !name.equals(uploadName)
212: && !client.rename(uploadName, name)) {
213: throw new IOException("File " + uploadName
214: + " could not be renamed to " + name);
215: }
216: } catch (IOException e) {
217: logger.error(
218: "Caught exception while closing stream on error: "
219: + e, e);
220: }
221: }
222: returnClient(client);
223: }
224: }
225:
226: protected FTPClientPool createClientPool() throws Exception {
227: FTPClientPool pool = new FTPClientPool();
228: pool.afterPropertiesSet();
229: return pool;
230: }
231:
232: protected FTPClient borrowClient() throws JBIException {
233: try {
234: return (FTPClient) getClientPool().borrowClient();
235: } catch (Exception e) {
236: throw new JBIException(e);
237: }
238: }
239:
240: protected void returnClient(FTPClient client) {
241: if (client != null) {
242: try {
243: getClientPool().returnClient(client);
244: } catch (Exception e) {
245: logger
246: .error("Failed to return client to pool: " + e,
247: e);
248: }
249: }
250: }
251: }
|