001: package it.unimi.dsi.mg4j.index.remote;
002:
003: /*
004: * MG4J: Managing Gigabytes for Java
005: *
006: * Copyright (C) 2006-2007 Sebastiano Vigna
007: *
008: * This library is free software; you can redistribute it and/or modify it
009: * under the terms of the GNU Lesser General Public License as published by the Free
010: * Software Foundation; either version 2.1 of the License, or (at your option)
011: * any later version.
012: *
013: * This library is distributed in the hope that it will be useful, but
014: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
015: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
016: * for more details.
017: *
018: * You should have received a copy of the GNU Lesser General Public License
019: * along with this program; if not, write to the Free Software
020: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
021: *
022: */
023:
024: import it.unimi.dsi.fastutil.io.BinIO;
025: import it.unimi.dsi.mg4j.index.BitStreamIndex;
026: import it.unimi.dsi.mg4j.index.Index;
027: import it.unimi.dsi.mg4j.index.CompressionFlags.Coding;
028: import it.unimi.dsi.Util;
029:
030: import java.io.DataInputStream;
031: import java.io.DataOutputStream;
032: import java.io.IOException;
033: import java.lang.reflect.InvocationTargetException;
034: import java.net.InetAddress;
035: import java.net.ServerSocket;
036: import java.net.Socket;
037: import java.net.SocketAddress;
038: import java.net.URISyntaxException;
039: import java.util.concurrent.Executors;
040: import java.util.concurrent.ThreadPoolExecutor;
041:
042: import org.apache.commons.configuration.ConfigurationException;
043: import org.apache.log4j.Logger;
044:
045: import com.martiansoftware.jsap.FlaggedOption;
046: import com.martiansoftware.jsap.JSAP;
047: import com.martiansoftware.jsap.JSAPException;
048: import com.martiansoftware.jsap.JSAPResult;
049: import com.martiansoftware.jsap.Parameter;
050: import com.martiansoftware.jsap.SimpleJSAP;
051: import com.martiansoftware.jsap.Switch;
052: import com.martiansoftware.jsap.UnflaggedOption;
053:
054: /** A daemon exposing an index remotely.
055: *
056: * <p>MG4J provides remote access to indices using a client/server pair. A thread
057: * running the {@link #start(Index, InetAddress, int, boolean)} method of this
058: * class acts as a server.
059: *
060: * <p>Once a server is up and running, clients can connect by calling the
061: * {@link #getIndex(String, int, boolean, boolean)} method, which sends to
062: * the server a {@link #GET_INDEX} command and returns a (serialised) {@link it.unimi.dsi.mg4j.index.Index}
063: * that will handle all communication with the server transparently. The static
064: * factory methods in {@link it.unimi.dsi.mg4j.index.Index} (such as {@link it.unimi.dsi.mg4j.index.Index#getInstance(CharSequence)})
065: * will turn URIs such as <samp>mg4j://localhost</samp> into calls
066: * to {@link #getIndex(String, int, boolean, boolean)}.
067: *
068: * <p>Presently there are two main kinds of remote indices: those exposing the local
069: * bitstream remotely, and those exposing the results of document iterators remotely.
070: * In the first case, the bitstream is passed over the net, and it is decoded locally
071: * (it is <em>very</em> advisable not using a compression method requiring document sizes in that case,
072: * as the size list will be serialised into the returned {@link it.unimi.dsi.mg4j.index.Index} instance).
073: * When started, an index server will by default try to expose the local bitstream,
074: * if possible, but it can be forced not to do so with a suitable parameter. Depending
075: * on the kind of access requested, the index server will return instances of
076: * {@link it.unimi.dsi.mg4j.index.remote.RemoteBitStreamIndex}
077: * or instances of {@link it.unimi.dsi.mg4j.index.remote.RemoteIndex}. The two approaches vary
078: * wildly in performance, and some profiling with specific applications is advisable
079: * before choosing one method over the other.
080: *
081: * <p>All other server commands available spawn a new thread that will handle a specific
082: * data structure over the newly created socket. Thus, after writing the command
083: * {@link #GET_SIZE_LIST} the server will start a {@link it.unimi.dsi.mg4j.index.remote.RemoteSizeList.ServerThread}
084: * and answer to queries about the size list, whereas after writing the command
085: * {@link #GET_INDEX_READER} the server will start a
086: * {@link it.unimi.dsi.mg4j.index.remote.RemoteIndexReader.ServerThread}
087: * and answer to queries about the underlying local {@link it.unimi.dsi.mg4j.index.IndexReader}.
088: *
089: * <p>For simplicity and easiness in code editing, every client class starting a thread
090: * (e.g., {@link it.unimi.dsi.mg4j.index.remote.RemoteTermMap}, {@link it.unimi.dsi.mg4j.index.remote.RemoteIndexReader},
091: * …) sports a nested static class called <code>ServerThread</code>
092: * containing the code of the corresponding server thread. In this
093: * way client and server code sit in the same source file, making editing and debugging simpler.
094: *
095: * @author Alessandro Arrabito
096: * @author Sebastiano Vigna
097: */
098: public class IndexServer {
099:
100: public final static int DEFAULT_PORT = 9090;
101:
102: private static final Logger LOGGER = Util
103: .getLogger(IndexServer.class);
104:
105: public static final byte GET_INDEX = 0;
106: public static final byte GET_INDEX_READER = 1;
107: public static final byte GET_TERM_MAP = 2;
108: public static final byte GET_PREFIX_MAP = 3;
109: public static final byte GET_SIZE_LIST = 4;
110: public static final byte GET_OFFSET_LIST = 5;
111: public static final byte GET_CLIENT_INPUT_STREAM = 6;
112:
113: /** Returns an index object corresponding a given index server specified by host and port.
114: * @param host the server host.
115: * @param port the server port, or -1 for {@link #DEFAULT_PORT}.
116: * @param randomAccess whether the index should be accessible randomly.
117: * @param documentSizes if true, document sizes will be loaded (note that sometimes document sizes
118: * might be loaded anyway because the compression method for positions requires it).
119: */
120: public static Index getIndex(final String host, final int port,
121: final boolean randomAccess, final boolean documentSizes)
122: throws IOException, ClassNotFoundException {
123: final Socket socket = new Socket(host,
124: port == -1 ? DEFAULT_PORT : port);
125: LOGGER.debug("Accessing remote index at " + host + ":" + port
126: + "...");
127: final DataOutputStream outputStream = new DataOutputStream(
128: socket.getOutputStream());
129: outputStream.writeByte(GET_INDEX);
130: outputStream.writeBoolean(randomAccess);
131: outputStream.writeBoolean(documentSizes);
132: outputStream.flush();
133: Index index = (Index) BinIO.loadObject(socket.getInputStream());
134: socket.close();
135: LOGGER.debug("Index at " + socket + " downloaded: " + index);
136: return index;
137: }
138:
139: /** Starts an index server.
140: * @param index the underlying index.
141: * @param serverSocket the daemon socket.
142: * @param forceRemoteIndex force the thread to use a {@link RemoteIndex} instead
143: * of a {@link RemoteBitStreamIndex}, even if the latter would be usable.
144: */
145:
146: public static void start(final Index index,
147: final ServerSocket serverSocket, boolean forceRemoteIndex)
148: throws IOException {
149: LOGGER.info("Index server started at "
150: + serverSocket.getLocalSocketAddress());
151: ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors
152: .newCachedThreadPool();
153: Socket socket;
154: final SocketAddress localSocketAddress = serverSocket
155: .getLocalSocketAddress();
156: int command;
157: for (;;) {
158:
159: socket = serverSocket.accept();
160: command = socket.getInputStream().read();
161: LOGGER.debug("Remote command: " + command);
162:
163: switch (command) {
164: case GET_INDEX:
165: DataInputStream dis = new DataInputStream(socket
166: .getInputStream());
167: DataOutputStream dos = new DataOutputStream(socket
168: .getOutputStream());
169: boolean randomAccess = dis.readBoolean();
170: boolean documentSizes = dis.readBoolean();
171:
172: if (index instanceof BitStreamIndex
173: && !forceRemoteIndex) {
174: BitStreamIndex localIndex = (BitStreamIndex) index;
175: if (randomAccess && localIndex.offsets == null) {
176: randomAccess = false;
177: LOGGER
178: .warn("Random access will not be available for index "
179: + localIndex);
180: }
181: /** Note that in case of Golomb or interpolative position coding
182: * we are forced to serialise and transfer the entire size list,
183: * or decoding would be too slow. */
184: BinIO
185: .storeObject(
186: new RemoteBitStreamIndex(
187: localSocketAddress,
188: index.numberOfDocuments,
189: index.numberOfTerms,
190: index.numberOfPostings,
191: index.numberOfOccurrences,
192: index.maxCount,
193: localIndex.payload,
194: localIndex.frequencyCoding,
195: localIndex.pointerCoding,
196: localIndex.countCoding,
197: localIndex.positionCoding,
198: localIndex.quantum,
199: localIndex.height,
200: localIndex.bufferSize,
201: localIndex.termProcessor,
202: localIndex.field,
203: localIndex.properties,
204: localIndex.termMap != null ? new RemoteTermMap(
205: localSocketAddress,
206: index.numberOfTerms)
207: : null,
208: localIndex.prefixMap != null ? new RemotePrefixMap(
209: localSocketAddress,
210: index.numberOfTerms)
211: : null,
212: localIndex.positionCoding == Coding.GOLOMB
213: || localIndex.positionCoding == Coding.INTERPOLATIVE ? localIndex.sizes
214: : (documentSizes ? new RemoteSizeList(
215: localSocketAddress,
216: localIndex.numberOfDocuments)
217: : null),
218: randomAccess ? new RemoteOffsetList(
219: localSocketAddress,
220: localIndex.offsets
221: .size())
222: : null), dos);
223: } else
224: BinIO.storeObject(new RemoteIndex(
225: localSocketAddress,
226: index.numberOfDocuments,
227: index.numberOfTerms,
228: index.numberOfPostings,
229: index.numberOfOccurrences, index.maxCount,
230: index.payload, index.hasCounts,
231: index.hasPositions, index.termProcessor,
232: index.field,
233: (documentSizes ? new RemoteSizeList(
234: localSocketAddress,
235: index.numberOfDocuments) : null),
236: index.properties), dos);
237: dos.flush();
238: break;
239:
240: case GET_INDEX_READER:
241: threadPool.execute(new RemoteIndexReader.ServerThread(
242: socket, index));
243: break;
244:
245: case GET_TERM_MAP:
246: threadPool.execute(new RemoteTermMap.ServerThread(
247: socket, ((BitStreamIndex) index).termMap));
248: break;
249:
250: case GET_PREFIX_MAP:
251: threadPool.execute(new RemotePrefixMap.ServerThread(
252: socket, ((BitStreamIndex) index).prefixMap));
253: break;
254:
255: case GET_SIZE_LIST:
256: threadPool.execute(new RemoteSizeList.ServerThread(
257: socket, index.sizes));
258: break;
259:
260: case GET_OFFSET_LIST:
261: threadPool.execute(new RemoteOffsetList.ServerThread(
262: socket, ((BitStreamIndex) index).offsets));
263: break;
264:
265: case GET_CLIENT_INPUT_STREAM:
266: threadPool.execute(new RemoteInputStream.ServerThread(
267: socket, ((BitStreamIndex) index)
268: .getInputStream()));
269: break;
270: }
271:
272: }
273: }
274:
275: /** Starts an index-server daemon thread.
276: * @param index the underlying index.
277: * @param address the IP address for the daemon socket.
278: * @param port the IP port for the daemon socket.
279: * @param forceRemoteIndex force the thread to use a {@link RemoteIndex} instead
280: * of a {@link RemoteBitStreamIndex}, even if the latter would be usable.
281: */
282:
283: public static void start(final Index index,
284: final InetAddress address, final int port,
285: boolean forceRemoteIndex) throws IOException {
286: start(index, new ServerSocket(port, 0, address),
287: forceRemoteIndex);
288: }
289:
290: public static void main(final String[] arg)
291: throws ConfigurationException, IOException,
292: URISyntaxException, ClassNotFoundException, JSAPException,
293: SecurityException, InstantiationException,
294: IllegalAccessException, InvocationTargetException,
295: NoSuchMethodException {
296: SimpleJSAP jsap = new SimpleJSAP(
297: IndexServer.class.getName(),
298: "Starts a server index daemon.",
299: new Parameter[] {
300: new FlaggedOption("port", JSAP.INTEGER_PARSER,
301: "9090", JSAP.NOT_REQUIRED, 'p', "port",
302: "The server port."),
303: new Switch("forceremote", 'f', "force-remote",
304: "Forces a remote index instead of a remote bitstream index."),
305: new UnflaggedOption("ipaddr",
306: JSAP.INETADDRESS_PARSER, JSAP.REQUIRED,
307: "The server address."),
308: new UnflaggedOption("basename",
309: JSAP.STRING_PARSER, JSAP.REQUIRED,
310: "The basename or uri of the index") });
311:
312: JSAPResult jsapResult = jsap.parse(arg);
313: if (jsap.messagePrinted())
314: return;
315:
316: int port = jsapResult.getInt("port");
317: String basename = jsapResult.getString("basename");
318: boolean forceRemote = jsapResult.getBoolean("forceremote");
319: start(Index.getInstance(basename), jsapResult
320: .getInetAddress("ipaddr"), port, forceRemote);
321: }
322:
323: }
|