001: // Shuffler.java
002: // $Id: Shuffler.java,v 1.6 2000/08/16 21:37:41 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // Please first read the full copyright statement in file COPYRIGHT.html
005:
006: package org.w3c.jigsaw.http;
007:
008: import java.io.File;
009: import java.io.FileDescriptor;
010: import java.io.FileInputStream;
011: import java.io.FileNotFoundException;
012: import java.io.FileOutputStream;
013: import java.io.IOException;
014: import java.io.PrintStream;
015:
016: import java.util.Vector;
017:
018: /**
019: * This describes the message structure that we exchange with the shuffler.
020: * The equivalent C structure is defined in ShufflerProtocol.h
021: */
022:
023: class ShuffleMessage {
024: byte op;
025: int id;
026: int length;
027: }
028:
029: /**
030: * Manage the queue of pending shuffler requests.
031: * This object manages the queue of pending shuffler requests. It life goes
032: * like this: wait for something to be put on the queue, once the queue is
033: * not empty, send some request to the underlying <em>shuffler</em> process
034: * and try getting <strong>DONE</strong> messages from the shuffler.
035: * For each message got, find back the appropriate handler and terminate it.
036: */
037:
038: class ShufflerThread extends Thread {
039: private static final boolean debug = true;
040:
041: Shuffler s;
042: Vector q;
043:
044: /**
045: * Add the given handler in our wait queue.
046: * @param h The handler to wait for.
047: */
048:
049: synchronized void registerHandler(ShuffleHandler h) {
050: q.addElement(h);
051: notifyAll();
052: }
053:
054: /**
055: * Process the given shuffler process message.
056: * This method can emit a RuntimeException if some internal state becomes
057: * inconsistent. This is typically the case if we can find back a request
058: * from the queue.
059: * @param msg The (process) shuffler message to handle.
060: */
061:
062: synchronized void processMessage(ShuffleMessage msg) {
063: int id = msg.id;
064: // FIXME: use some thing better than liner lookup here ?
065: for (int i = 0; i < q.size(); i++) {
066: ShuffleHandler h = (ShuffleHandler) q.elementAt(i);
067: if (h.id == id) {
068: q.removeElementAt(i);
069: h.done(msg.length);
070: return;
071: }
072: }
073: for (int i = 0; i < q.size(); i++)
074: System.out.println("waiting for : " + q.elementAt(i));
075: throw new RuntimeException(this .getClass().getName()
076: + ": received unexpected id " + id);
077: }
078:
079: /**
080: * Block the thread until we get some pending shuffles to wait for.
081: */
082:
083: synchronized void waitForHandlers() {
084: while (q.size() == 0) {
085: try {
086: wait();
087: } catch (InterruptedException e) {
088: }
089: }
090: }
091:
092: public void run() {
093: while (true) {
094: waitForHandlers();
095: ShuffleMessage msg = s.getNextMessage();
096: processMessage(msg);
097: }
098: }
099:
100: ShufflerThread(Shuffler s) {
101: this .s = s;
102: this .q = new Vector();
103: setPriority(9);
104: setName("ShufflerThread");
105: setDaemon(true);
106: }
107:
108: }
109:
110: /**
111: * Objects describing pending shuffle requests.
112: */
113:
114: class ShuffleHandler {
115: FileDescriptor in = null;
116: FileDescriptor out = null;
117: boolean doneflag = false;
118: int id = -1;
119: int length = -1;
120:
121: /**
122: * Notify that this shuffle handle is now completed.
123: */
124:
125: synchronized void done(int length) {
126: this .length = length;
127: this .doneflag = true;
128: notifyAll();
129: }
130:
131: /**
132: * Wait for this shuffle completion.
133: * This method blocks the calling thread until the shuffle is completed.
134: */
135:
136: synchronized int waitForCompletion() {
137: while (!doneflag) {
138: try {
139: wait();
140: } catch (InterruptedException e) {
141: }
142: }
143: return length;
144: }
145:
146: /**
147: * Print a ShuffleHandler (for debugging).
148: */
149:
150: public String toString() {
151: return id + " " + doneflag;
152: }
153:
154: ShuffleHandler(FileDescriptor in, FileDescriptor out) {
155: this .in = in;
156: this .out = out;
157: }
158: }
159:
160: /**
161: * This class implements both a nice hack and some magic.
162: * It uses an underlying <em>shuffler</em> process to speed up the sending
163: * of big data files back to the client.
164: * <p>The protocol between the server and the shuffler is quite simple, one
165: * byte indicate the operation, which takes as argument two file descriptors.
166: */
167:
168: public class Shuffler {
169: /**
170: * The property giving the path of the shuffler server.
171: * The shuffler server is an optional server helper, that deals with
172: * serving resource contents. When resource contents can be efficiently
173: * messaged between process boundaries (eg using sendmsg), the shuffler
174: * server takes over the task of sending resource's content back to the
175: * client. This property gives the path of the shuffler server binary
176: * program.
177: */
178: public static final String SHUFFLER_P = "org.w3c.jigsaw.shuffler";
179:
180: private static Process shuffler = null;
181: private static boolean inited = false;
182: private ShufflerThread waiter = null;
183: private int fd = -1;
184:
185: private native int initialize(String path);
186:
187: private native synchronized int shuffle(ShuffleHandler h);
188:
189: private native int getNextMessage(ShuffleMessage msg);
190:
191: ShuffleMessage getNextMessage() {
192: ShuffleMessage msg = new ShuffleMessage();
193: int duration = 2;
194:
195: while (true) {
196: int ecode = getNextMessage(msg);
197:
198: if (duration < 250)
199: duration = (duration << 1);
200: if (ecode > 0) {
201: return msg;
202: } else if (ecode == 0) {
203: // yield and retyr (yes, this *is* pooling :=(
204: try {
205: Thread.sleep(duration);
206: } catch (InterruptedException e) {
207: }
208: Thread.yield();
209: } else if (ecode < 0) {
210: String m = (this .getClass().getName()
211: + "[getNextMessage]: failed (e=" + ecode + ")");
212: throw new RuntimeException(m);
213: }
214: }
215: }
216:
217: /**
218: * Initialize this class.
219: * This deserve a special method, since we want any exception to be
220: * caught when invoking theinstance constructor.
221: * <p>This method tries to launch the shuffler process automatically.
222: * @param path The driectory for UNIX socket bindings.
223: * @return A boolean <strong>true</strong> is every thing went fine,
224: * <strong>false</strong> otherwise.
225: */
226:
227: private synchronized boolean classInitialize(String path) {
228: File socket = new File(path + "/shuffler");
229: // Delete any old socket for this shuffler:
230: socket.delete();
231: // Load the native code:
232: Runtime.getRuntime().loadLibrary("Shuffle");
233: inited = true;
234: // Get the shuffler binary path:
235: String shuffler_bin = System.getProperty(SHUFFLER_P);
236: if (shuffler_bin == null)
237: return false;
238: // Run it:
239: try {
240: String args[] = new String[2];
241: args[0] = shuffler_bin;
242: args[1] = path + "/shuffler";
243: // This is intended for debug only, it makes the shuffler emit traces
244: // in the provided (-log) log file
245: // args[2] = "-v";
246: // args[3] = "-log";
247: // args[4] = "/nfs/usr/abaird/Jigsaw/logs/shuffler";
248: shuffler = Runtime.getRuntime().exec(args);
249: } catch (Exception e) {
250: throw new RuntimeException(this .getClass().getName()
251: + "[classInitialize]: "
252: + "unable to launch shuffler.");
253: }
254: // Wait for the shuffler to create its listening socket:
255: int timeout = 10000;
256: while ((timeout > 0) && (!socket.exists())) {
257: timeout -= 500;
258: try {
259: Thread.sleep(500);
260: } catch (InterruptedException e) {
261: }
262: }
263: if (!socket.exists()) {
264: throw new RuntimeException(this .getClass().getName()
265: + "[classInitialize]: "
266: + " didn't create its socket.");
267: }
268: return true;
269: }
270:
271: private int shuffle(FileDescriptor in, FileDescriptor out)
272: throws IOException {
273: ShuffleHandler handle = new ShuffleHandler(in, out);
274: // WARNING: code below shouldn't be changed it contains black magic
275: // The thing is that the shuffler is really fast, and can even
276: // finish its job before the waiter gets a chance to register the
277: // appropriate identifier (so it gets a reply for an identifier it
278: // doesn't know about).
279: // Synchronizing the waiter makes the 'shuffle' + 'register'
280: // operations atomic with regard to the waiter thread, which is
281: // what we want.
282: // If you have understood above comment, then there is no more black
283: // magic for you below.
284: synchronized (waiter) {
285: if ((handle.id = shuffle(handle)) < 0)
286: throw new IOException(this .getClass().getName()
287: + " unable to shuffle !");
288: waiter.registerHandler(handle);
289: }
290: return handle.waitForCompletion();
291: }
292:
293: /**
294: * Shuffle the given rteply body to the given client.
295: * This methods tries to outout the given reply
296: */
297:
298: public int shuffle(Client client, Reply reply) throws IOException {
299: FileDescriptor in = reply.getInputFileDescriptor();
300: if (in == null)
301: return -1;
302: // client.flushOutput() ;
303: // FileDescriptor out = client.getOutputFileDescriptor() ;
304: // int written = shuffle (in, out) ;
305: // if ( written < 0 )
306: // throw new IOException ("Shuffler failure.") ;
307: // return written ;
308: return -1;
309: }
310:
311: public synchronized void shutdown() {
312: // Kill the shuffler process (if needed) and waitfor its completion
313: if (shuffler != null) {
314: shuffler.destroy();
315: while (true) {
316: try {
317: shuffler.waitFor();
318: break;
319: } catch (InterruptedException ex) {
320: }
321: }
322: shuffler = null;
323: }
324: // Un-initialize, stop the waiter thread:
325: inited = false;
326: waiter.stop();
327: waiter = null;
328: }
329:
330: /**
331: * Create a new data shuffler.
332: * The path identifies the directory in which UNIX socket will get bind.
333: * This should be an absloute path, eg <code>/tmp/shuffler</code>.
334: * @param path The path to the server.
335: */
336:
337: public Shuffler(String path) {
338: if (((!inited) && (!classInitialize(path)))
339: || (initialize(path) < 0)) {
340: throw new RuntimeException(this .getClass().getName()
341: + ": unable to connect to shuffler " + path);
342: }
343: this .waiter = new ShufflerThread(this );
344: this .waiter.start();
345: }
346:
347: // testing only
348:
349: public static void main(String args[])
350: throws FileNotFoundException, IOException {
351: Shuffler s = new Shuffler(args[0]);
352: FileInputStream f = new FileInputStream("from");
353: FileOutputStream t = new FileOutputStream("to");
354: s.shuffle(f.getFD(), t.getFD());
355: f.close();
356: t.close();
357: }
358: }
|