001: package it.unimi.dsi.mg4j.index.remote;
002:
003: /*
004: * MG4J: Managing Gigabytes for Java
005: *
006: * Copyright (C) 2006-2007 Sebastiano Vigna
007: *
008: * This library is free software; you can redistribute it and/or modify it
009: * under the terms of the GNU Lesser General Public License as published by the Free
010: * Software Foundation; either version 2.1 of the License, or (at your option)
011: * any later version.
012: *
013: * This library is distributed in the hope that it will be useful, but
014: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
015: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
016: * for more details.
017: *
018: * You should have received a copy of the GNU Lesser General Public License
019: * along with this program; if not, write to the Free Software
020: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
021: *
022: */
023:
024: import it.unimi.dsi.fastutil.bytes.ByteArrays;
025: import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
026: import it.unimi.dsi.fastutil.io.RepositionableStream;
027: import it.unimi.dsi.Util;
028:
029: import java.io.EOFException;
030: import java.io.IOException;
031: import java.io.InputStream;
032: import java.net.Socket;
033: import java.net.SocketAddress;
034:
035: import org.apache.log4j.Logger;
036:
037: /** A client class that connects to an {@link it.unimi.dsi.mg4j.index.remote.IndexServer}
038: * and exposes a remote {@link java.io.InputStream} locally.
039: *
040: * @author Alessandro Arrabito
041: */
042: public class RemoteInputStream extends InputStream implements
043: RepositionableStream {
044: private static final Logger LOGGER = Util
045: .getLogger(RemoteInputStream.class);
046:
047: private static final byte READ_ARRAY = 0;
048: private static final byte READ_BYTE = 1;
049: private static final byte SET_POSITION = 2;
050: private static final byte READ_POSITION = 3;
051: private static final byte AVAILABLE = 4;
052: private static final byte SKIP = 5;
053: private static final byte CLOSE = 6;
054:
055: /** The connection to the server. */
056: final private RemoteIndexServerConnection connection;
057:
058: /** Creates a new client input stream using a given socket address.
059: * @param address the address of the index server.
060: */
061: public RemoteInputStream(final SocketAddress address)
062: throws IOException {
063: connection = new RemoteIndexServerConnection(address,
064: IndexServer.GET_CLIENT_INPUT_STREAM);
065: }
066:
067: public int available() throws IOException {
068: connection.outputStream.writeByte(RemoteInputStream.AVAILABLE);
069: connection.outputStream.flush();
070: return connection.inputStream.readInt();
071: }
072:
073: public void close() throws IOException {
074: connection.outputStream.writeByte(RemoteInputStream.CLOSE);
075: connection.outputStream.flush();
076: try {
077: connection.close();
078: } catch (IOException dontCare) {
079: // Whatever may happen, we're so outta here...
080: }
081: }
082:
083: public int read(final byte[] array, final int offset,
084: final int length) throws IOException {
085: if (length == 0)
086: return 0;
087: ByteArrays.ensureOffsetLength(array, offset, length);
088: connection.outputStream.writeByte(RemoteInputStream.READ_ARRAY);
089: connection.outputStream.writeInt(length);
090: connection.outputStream.flush();
091: final int result = connection.inputStream.readInt();
092: if (result <= 0)
093: return result;
094: connection.inputStream.read(array, offset, result);
095: return result;
096: }
097:
098: public long skip(final long toSkip) throws IOException {
099: if (toSkip < 0)
100: throw new IOException("Negative skip: " + toSkip);
101: connection.outputStream.writeByte(RemoteInputStream.SKIP);
102: connection.outputStream.writeLong(toSkip);
103: connection.outputStream.flush();
104: return connection.inputStream.readLong();
105: }
106:
107: public int read() throws IOException {
108: connection.outputStream.writeByte(RemoteInputStream.READ_BYTE);
109: connection.outputStream.flush();
110: return connection.inputStream.readInt();
111: }
112:
113: public void position(final long newPosition) throws IOException {
114: connection.outputStream
115: .writeByte(RemoteInputStream.SET_POSITION);
116: connection.outputStream.writeLong(newPosition);
117: connection.outputStream.flush();
118: }
119:
120: public long position() throws IOException {
121: connection.outputStream
122: .writeByte(RemoteInputStream.SET_POSITION);
123: connection.outputStream.flush();
124: return connection.inputStream.readLong();
125: }
126:
127: public static class ServerThread extends
128: it.unimi.dsi.mg4j.index.remote.ServerThread {
129: private static final boolean DEBUG = false;
130:
131: /** The remoted input stream. */
132: private final FastBufferedInputStream remotedInputStream;
133:
134: //private final FileInputStream remotedInputStream;
135:
136: public ServerThread(final Socket socket,
137: final InputStream stream) throws IOException {
138: super (socket);
139: //this.remotedInputStream = (FileInputStream)stream;
140: // TODO: which buffer size?
141: this .remotedInputStream = new FastBufferedInputStream(
142: stream);
143: }
144:
145: public void run() {
146: try {
147: int command;
148: byte[] readBuf = ByteArrays.EMPTY_ARRAY;
149: for (;;) {
150: command = inputStream.readByte();
151: if (DEBUG)
152: LOGGER.debug("Received remote command: "
153: + command);
154:
155: switch (command) {
156:
157: case RemoteInputStream.READ_ARRAY:
158: // TODO: avoid reallocating the buffer
159: int len = inputStream.readInt();
160: if (readBuf.length < len)
161: readBuf = new byte[len];
162: int result = remotedInputStream.read(readBuf,
163: 0, len);
164: outputStream.writeInt(result);
165: if (result > 0)
166: outputStream.write(readBuf, 0, result);
167: outputStream.flush();
168: break;
169:
170: case RemoteInputStream.READ_BYTE:
171: outputStream
172: .writeInt(remotedInputStream.read());
173: outputStream.flush();
174: break;
175:
176: case RemoteInputStream.SET_POSITION:
177: //remotedInputStream.getChannel().position( inputStream.readLong() );
178: remotedInputStream.position(inputStream
179: .readLong());
180: break;
181:
182: case RemoteInputStream.READ_POSITION:
183: //outputStream.writeLong( remotedInputStream.getChannel().position() );
184: outputStream.writeLong(remotedInputStream
185: .position());
186: outputStream.flush();
187: break;
188:
189: case RemoteInputStream.AVAILABLE:
190: outputStream.writeLong(remotedInputStream
191: .available());
192: outputStream.flush();
193: break;
194:
195: case RemoteInputStream.SKIP:
196: outputStream.writeLong(remotedInputStream
197: .skip(inputStream.readLong()));
198: outputStream.flush();
199: break;
200:
201: case RemoteInputStream.CLOSE:
202: return;
203:
204: default:
205: LOGGER.error("Unknown remote command: "
206: + command);
207: }
208: }
209: } catch (EOFException e) {
210: LOGGER.warn("The socket has been closed");
211: } catch (Exception e) {
212: LOGGER.fatal(e, e);
213: } finally {
214: try {
215: remotedInputStream.close();
216: // We don't close the socket--the caller should
217: } catch (IOException e) {
218: }
219: }
220: }
221: }
222: }
|