001: /*
002: * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
003: * (http://h2database.com/html/license.html).
004: * Initial Developer: H2 Group
005: */
006: package org.h2.server.ftp;
007:
008: import java.io.IOException;
009: import java.io.InputStream;
010: import java.io.OutputStream;
011: import java.net.InetAddress;
012: import java.net.ServerSocket;
013: import java.net.Socket;
014: import java.sql.SQLException;
015:
016: import org.h2.store.fs.FileSystem;
017: import org.h2.util.IOUtils;
018:
019: /**
020: * The implementation of the data channel of the FTP server.
021: */
022: public class FtpData extends Thread {
023:
024: private FtpServer server;
025: private InetAddress address;
026: private ServerSocket serverSocket;
027: private volatile Socket socket;
028: private boolean active;
029: private int port;
030:
031: public FtpData(FtpServer server, InetAddress address,
032: ServerSocket serverSocket) throws IOException {
033: this .server = server;
034: this .address = address;
035: this .serverSocket = serverSocket;
036: }
037:
038: public FtpData(FtpServer server, InetAddress address, int port)
039: throws IOException {
040: this .server = server;
041: this .address = address;
042: this .port = port;
043: active = true;
044: }
045:
046: public void run() {
047: try {
048: synchronized (this ) {
049: Socket s = serverSocket.accept();
050: if (s.getInetAddress().equals(address)) {
051: server.log("Data connected:" + s.getInetAddress()
052: + " expected:" + address);
053: socket = s;
054: notifyAll();
055: } else {
056: server.log("Data REJECTED:" + s.getInetAddress()
057: + " expected:" + address);
058: close();
059: }
060: }
061: } catch (IOException e) {
062: e.printStackTrace();
063: }
064: }
065:
066: private void connect() throws IOException {
067: if (active) {
068: socket = new Socket(address, port);
069: } else {
070: waitUntilConnected();
071: }
072: }
073:
074: private void waitUntilConnected() {
075: while (serverSocket != null && socket == null) {
076: try {
077: wait();
078: } catch (InterruptedException e) {
079: // ignore
080: }
081: }
082: server.log("connected");
083: }
084:
085: public void close() {
086: serverSocket = null;
087: socket = null;
088: }
089:
090: public synchronized void receive(FileSystem fs, String fileName)
091: throws IOException, SQLException {
092: connect();
093: try {
094: InputStream in = socket.getInputStream();
095: OutputStream out = fs.openFileOutputStream(fileName, false);
096: IOUtils.copy(in, out);
097: out.close();
098: } finally {
099: socket.close();
100: }
101: server.log("closed");
102: }
103:
104: public synchronized void send(FileSystem fs, String fileName,
105: long skip) throws IOException {
106: connect();
107: try {
108: OutputStream out = socket.getOutputStream();
109: InputStream in = fs.openFileInputStream(fileName);
110: IOUtils.skipFully(in, skip);
111: IOUtils.copy(in, out);
112: in.close();
113: } finally {
114: socket.close();
115: }
116: server.log("closed");
117: }
118:
119: public synchronized void send(byte[] data) throws IOException {
120: connect();
121: try {
122: OutputStream out = socket.getOutputStream();
123: out.write(data);
124: } finally {
125: socket.close();
126: }
127: server.log("closed");
128: }
129:
130: }
|