001: /*
002: * @(#)StreamDemultiplexor.java 0.3-2 18/06/1999
003: *
004: * This file is part of the HTTPClient package
005: * Copyright (C) 1996-1999 Ronald Tschalär
006: *
007: * This library is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU Lesser General Public
009: * License as published by the Free Software Foundation; either
010: * version 2 of the License, or (at your option) any later version.
011: *
012: * This library is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this library; if not, write to the Free
019: * Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
020: * MA 02111-1307, USA
021: *
022: * For questions, suggestions, bug-reports, enhancement-requests etc.
023: * I may be contacted at:
024: *
025: * ronald@innovation.ch
026: *
027: */
028:
029: package HTTPClient;
030:
031: import java.io.*;
032: import java.net.Socket;
033: import java.util.Vector;
034: import java.util.Enumeration;
035:
036: /**
037: * This class handles the demultiplexing of input stream. This is needed
038: * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
039: *
040: * @version 0.3-2 18/06/1999
041: * @author Ronald Tschalär
042: */
043:
044: class StreamDemultiplexor implements GlobalConstants {
045: /** the protocol were handling request for */
046: private int Protocol;
047:
048: /** the connection we're working for */
049: private HTTPConnection Connection;
050:
051: /** the input stream to demultiplex */
052: private ExtBufferedInputStream Stream;
053:
054: /** the socket this hangs off */
055: private Socket Sock = null;
056:
057: /** signals after the closing of which stream to close the socket */
058: private ResponseHandler MarkedForClose;
059:
060: /** timer used to close the socket if unused for a given time */
061: private SocketTimeout.TimeoutEntry Timer = null;
062:
063: /** timer thread which implements the timers */
064: private static SocketTimeout TimerThread = null;
065:
066: /** a Vector to hold the list of response handlers were serving */
067: private LinkedList RespHandlerList;
068:
069: /** number of unread bytes in current chunk (if transf-enc == chunked) */
070: private int chunk_len;
071:
072: /** the currently set timeout for the socket */
073: private int cur_timeout = 0;
074:
075: static {
076: TimerThread = new SocketTimeout(60);
077: TimerThread.start();
078: }
079:
080: // Constructors
081:
082: /**
083: * a simple contructor.
084: *
085: * @param protocol the protocol used on this stream.
086: * @param sock the socket which we're to demux.
087: * @param connection the http-connection this socket belongs to.
088: */
089: StreamDemultiplexor(int protocol, Socket sock,
090: HTTPConnection connection) throws IOException {
091: this .Protocol = protocol;
092: this .Connection = connection;
093: RespHandlerList = new LinkedList();
094: init(sock);
095: }
096:
097: /**
098: * Initializes the demultiplexor with a new socket.
099: *
100: * @param stream the stream to demultiplex
101: */
102: private void init(Socket sock) throws IOException {
103: if (DebugDemux)
104: System.err
105: .println("Demux: Initializing Stream Demultiplexor ("
106: + this .hashCode() + ")");
107:
108: this .Sock = sock;
109: this .Stream = new ExtBufferedInputStream(sock.getInputStream());
110: MarkedForClose = null;
111: chunk_len = -1;
112:
113: // start a timer to close the socket after 60 seconds
114: Timer = TimerThread.setTimeout(this );
115: }
116:
117: // Methods
118:
119: /**
120: * Each Response must register with us.
121: */
122: void register(Response resp_handler, Request req)
123: throws RetryException {
124: synchronized (RespHandlerList) {
125: if (Sock == null)
126: throw new RetryException();
127:
128: RespHandlerList.addToEnd(new ResponseHandler(resp_handler,
129: req, this ));
130: }
131: }
132:
133: /**
134: * creates an input stream for the response.
135: *
136: * @param resp the response structure requesting the stream
137: * @return an InputStream
138: */
139: RespInputStream getStream(Response resp) {
140: ResponseHandler resph;
141: for (resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList
142: .next()) {
143: if (resph.resp == resp)
144: break;
145: }
146:
147: if (resph != null)
148: return resph.stream;
149: else
150: return null;
151: }
152:
153: /**
154: * Restarts the timer thread that will close an unused socket after
155: * 60 seconds.
156: */
157: void restartTimer() {
158: if (Timer != null)
159: Timer.reset();
160: }
161:
162: /**
163: * reads an array of bytes from the master stream.
164: */
165: int read(byte[] b, int off, int len, ResponseHandler resph,
166: int timeout) throws IOException {
167: if (resph.exception != null)
168: throw (IOException) resph.exception.fillInStackTrace();
169:
170: if (resph.eof)
171: return -1;
172:
173: // read the headers and data for all responses preceding us.
174:
175: ResponseHandler head;
176: while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null
177: && head != resph) {
178: try {
179: head.stream.readAll(timeout);
180: } catch (IOException ioe) {
181: if (ioe instanceof InterruptedIOException)
182: throw ioe;
183: else
184: throw (IOException) resph.exception
185: .fillInStackTrace();
186: }
187: }
188:
189: // Now we can read from the stream.
190:
191: synchronized (this ) {
192: if (resph.exception != null)
193: throw (IOException) resph.exception.fillInStackTrace();
194:
195: if (DebugDemux) {
196: if (resph.resp.cd_type != CD_HDRS)
197: System.err.println("Demux: Reading for stream "
198: + resph.stream.hashCode() + " ("
199: + Thread.currentThread() + ")");
200: }
201:
202: if (Timer != null)
203: Timer.hyber();
204:
205: try {
206: int rcvd = -1;
207:
208: if (timeout != cur_timeout) {
209: if (DebugDemux) {
210: System.err.println("Demux: Setting timeout to "
211: + timeout + " ms");
212: }
213:
214: try {
215: Sock.setSoTimeout(timeout);
216: } catch (Throwable t) {
217: }
218: cur_timeout = timeout;
219: }
220:
221: switch (resph.resp.cd_type) {
222: case CD_HDRS:
223: rcvd = Stream.read(b, off, len);
224: if (rcvd == -1)
225: throw new EOFException(
226: "Premature EOF encountered");
227: break;
228:
229: case CD_0:
230: rcvd = -1;
231: close(resph);
232: break;
233:
234: case CD_CLOSE:
235: rcvd = Stream.read(b, off, len);
236: if (rcvd == -1)
237: close(resph);
238: break;
239:
240: case CD_CONTLEN:
241: int cl = resph.resp.ContentLength;
242: if (len > cl - resph.stream.count)
243: len = cl - resph.stream.count;
244:
245: rcvd = Stream.read(b, off, len);
246: if (rcvd == -1)
247: throw new EOFException(
248: "Premature EOF encountered");
249:
250: if (resph.stream.count + rcvd == cl)
251: close(resph);
252:
253: break;
254:
255: case CD_CHUNKED:
256: if (chunk_len == -1) // it's a new chunk
257: chunk_len = Codecs.getChunkLength(Stream);
258:
259: if (chunk_len > 0) // it's data
260: {
261: if (len > chunk_len)
262: len = chunk_len;
263: rcvd = Stream.read(b, off, len);
264: if (rcvd == -1)
265: throw new EOFException(
266: "Premature EOF encountered");
267: chunk_len -= rcvd;
268: if (chunk_len == 0) // got the whole chunk
269: {
270: Stream.read(); // CR
271: Stream.read(); // LF
272: chunk_len = -1;
273: }
274: } else // the footers (trailers)
275: {
276: resph.resp.readTrailers(Stream);
277: rcvd = -1;
278: close(resph);
279: chunk_len = -1;
280: }
281: break;
282:
283: case CD_MP_BR:
284: byte[] endbndry = resph.getEndBoundary(Stream);
285: int[] end_cmp = resph.getEndCompiled(Stream);
286:
287: rcvd = Stream.read(b, off, len);
288: if (rcvd == -1)
289: throw new EOFException(
290: "Premature EOF encountered");
291:
292: int ovf = Stream.pastEnd(endbndry, end_cmp);
293: if (ovf != -1) {
294: rcvd -= ovf;
295: Stream.reset();
296: close(resph);
297: }
298:
299: break;
300:
301: default:
302: throw new Error(
303: "Internal Error in StreamDemultiplexor: "
304: + "Invalid cd_type "
305: + resph.resp.cd_type);
306: }
307:
308: restartTimer();
309: return rcvd;
310:
311: } catch (InterruptedIOException ie) // don't intercept this one
312: {
313: restartTimer();
314: throw ie;
315: } catch (IOException ioe) {
316: if (DebugDemux) {
317: System.err.print("Demux: ("
318: + Thread.currentThread() + ") ");
319: ioe.printStackTrace();
320: }
321:
322: close(ioe, true);
323: throw resph.exception; // set by retry_requests
324: } catch (ParseException pe) {
325: if (DebugDemux) {
326: System.err.print("Demux: ("
327: + Thread.currentThread() + ") ");
328: pe.printStackTrace();
329: }
330:
331: close(new IOException(pe.toString()), true);
332: throw resph.exception; // set by retry_requests
333: }
334: }
335: }
336:
337: /**
338: * skips a number of bytes in the master stream. This is done via a
339: * dummy read, as the socket input stream doesn't like skip()'s.
340: */
341: synchronized long skip(long num, ResponseHandler resph)
342: throws IOException {
343: if (resph.exception != null)
344: throw (IOException) resph.exception.fillInStackTrace();
345:
346: if (resph.eof)
347: return 0;
348:
349: byte[] dummy = new byte[(int) num];
350: int rcvd = read(dummy, 0, (int) num, resph, 0);
351: if (rcvd == -1)
352: return 0;
353: else
354: return rcvd;
355: }
356:
357: /**
358: * Determines the number of available bytes.
359: */
360: synchronized int available(ResponseHandler resph)
361: throws IOException {
362: int avail = Stream.available();
363: if (resph == null)
364: return avail;
365:
366: if (resph.exception != null)
367: throw (IOException) resph.exception.fillInStackTrace();
368:
369: if (resph.eof)
370: return 0;
371:
372: switch (resph.resp.cd_type) {
373: case CD_0:
374: return 0;
375: case CD_HDRS:
376: // this is something of a hack; I could return 0, but then
377: // if you were waiting for something on a response that
378: // wasn't first in line (and you didn't try to read the
379: // other response) you'd wait forever. On the other hand,
380: // we might be making a false promise here...
381: return (avail > 0 ? 1 : 0);
382: case CD_CLOSE:
383: return avail;
384: case CD_CONTLEN:
385: int cl = resph.resp.ContentLength;
386: cl -= resph.stream.count;
387: return (avail < cl ? avail : cl);
388: case CD_CHUNKED:
389: return avail; // not perfect...
390: case CD_MP_BR:
391: return avail; // not perfect...
392: default:
393: throw new Error("Internal Error in StreamDemultiplexor: "
394: + "Invalid cd_type " + resph.resp.cd_type);
395: }
396:
397: }
398:
399: /**
400: * Closes the socket and all associated streams. If <var>exception</var>
401: * is not null then all active requests are retried.
402: *
403: * <P>There are five ways this method may be activated. 1) if an exception
404: * occurs during read or write. 2) if the stream is marked for close but
405: * no responses are outstanding (e.g. due to a timeout). 3) when the
406: * markedForClose response is closed. 4) if all response streams up until
407: * and including the markedForClose response have been closed. 5) if this
408: * demux is finalized.
409: *
410: * @param exception the IOException to be sent to the streams.
411: * @param was_reset if true then the exception is due to a connection
412: * reset; otherwise it means we generated the exception
413: * ourselves and this is a "normal" close.
414: */
415: synchronized void close(IOException exception, boolean was_reset) {
416: if (Sock == null) // already cleaned up
417: return;
418:
419: if (DebugDemux)
420: System.err
421: .println("Demux: Closing all streams and socket ("
422: + this .hashCode() + ")");
423:
424: try {
425: Stream.close();
426: } catch (IOException ioe) {
427: }
428: try {
429: Sock.close();
430: } catch (IOException ioe) {
431: }
432: Sock = null;
433:
434: if (Timer != null) {
435: Timer.kill();
436: Timer = null;
437: }
438:
439: Connection.DemuxList.remove(this );
440:
441: // Here comes the tricky part: redo outstanding requests!
442:
443: if (exception != null)
444: synchronized (RespHandlerList) {
445: retry_requests(exception, was_reset);
446: }
447: }
448:
449: /**
450: * Retries outstanding requests. Well, actually the RetryModule does
451: * that. Here we just throw a RetryException for each request so that
452: * the RetryModule can catch and handle them.
453: *
454: * @param exception the exception that led to this call.
455: * @param was_reset this flag is passed to the RetryException and is
456: * used by the RetryModule to distinguish abnormal closes
457: * from expected closes.
458: */
459: private void retry_requests(IOException exception, boolean was_reset) {
460: RetryException first = null, prev = null;
461: ResponseHandler resph = (ResponseHandler) RespHandlerList
462: .enumerate();
463:
464: while (resph != null) {
465: /* if the application is already reading the data then the
466: * response has already been handled. In this case we must
467: * throw the real exception.
468: */
469: if (resph.resp.got_headers) {
470: resph.exception = exception;
471: } else {
472: RetryException tmp = new RetryException(exception
473: .getMessage());
474: if (first == null)
475: first = tmp;
476:
477: tmp.request = resph.request;
478: tmp.response = resph.resp;
479: tmp.exception = exception;
480: tmp.conn_reset = was_reset;
481: tmp.first = first;
482: tmp.addToListAfter(prev);
483:
484: prev = tmp;
485: resph.exception = tmp;
486: }
487:
488: RespHandlerList.remove(resph);
489: resph = (ResponseHandler) RespHandlerList.next();
490: }
491: }
492:
493: /**
494: * Closes the associated stream. If this one has been markedForClose then
495: * the socket is closed; else closeSocketIfAllStreamsClosed is invoked.
496: */
497: synchronized void close(ResponseHandler resph) {
498: if (resph != (ResponseHandler) RespHandlerList.getFirst())
499: return;
500:
501: if (DebugDemux)
502: System.err.println("Demux: Closing stream "
503: + resph.stream.hashCode() + " ("
504: + Thread.currentThread() + ")");
505:
506: resph.eof = true;
507: RespHandlerList.remove(resph);
508:
509: if (resph == MarkedForClose)
510: close(new IOException("Premature end of Keep-Alive"), false);
511: else
512: closeSocketIfAllStreamsClosed();
513: }
514:
515: /**
516: * Close the socket if all the streams have been closed.
517: *
518: * <P>When a stream reaches eof it is removed from the response handler
519: * list, but when somebody close()'s the response stream it is just
520: * marked as such. This means that all responses in the list have either
521: * not been read at all or only partially read, but they might have been
522: * close()'d meaning that nobody is interested in the data. So If all the
523: * response streams up till and including the one markedForClose have
524: * been close()'d then we can remove them from our list and close the
525: * socket.
526: *
527: * <P>Note: if the response list is emtpy or if no response is
528: * markedForClose then this method does nothing. Specifically it does
529: * not close the socket. We only want to close the socket if we've been
530: * told to do so.
531: *
532: * <P>Also note that there might still be responses in the list after
533: * the markedForClose one. These are due to us having pipelined more
534: * requests to the server than it's willing to serve on a single
535: * connection. These requests will be retried if possible.
536: */
537: synchronized void closeSocketIfAllStreamsClosed() {
538: synchronized (RespHandlerList) {
539: ResponseHandler resph = (ResponseHandler) RespHandlerList
540: .enumerate();
541:
542: while (resph != null && resph.stream.closed) {
543: if (resph == MarkedForClose) {
544: // remove all response handlers first
545: ResponseHandler tmp;
546: do {
547: tmp = (ResponseHandler) RespHandlerList
548: .getFirst();
549: RespHandlerList.remove(tmp);
550: } while (tmp != resph);
551:
552: // close the socket
553: close(
554: new IOException(
555: "Premature end of Keep-Alive"),
556: false);
557: return;
558: }
559:
560: resph = (ResponseHandler) RespHandlerList.next();
561: }
562: }
563: }
564:
565: /**
566: * returns the socket associated with this demux
567: */
568: synchronized Socket getSocket() {
569: if (MarkedForClose != null)
570: return null;
571:
572: if (Timer != null)
573: Timer.hyber();
574: return Sock;
575: }
576:
577: /**
578: * Mark this demux to not accept any more request and to close the
579: * stream after this <var>resp</var>onse or all requests have been
580: * processed, or close immediately if no requests are registered.
581: *
582: * @param response the Response after which the connection should
583: * be closed.
584: */
585: synchronized void markForClose(Response resp) {
586: synchronized (RespHandlerList) {
587: if (RespHandlerList.getFirst() == null) // no active request,
588: { // so close the socket
589: close(new IOException("Premature end of Keep-Alive"),
590: false);
591: return;
592: }
593: }
594:
595: if (Timer != null) {
596: Timer.kill();
597: Timer = null;
598: }
599:
600: ResponseHandler resph, lasth = null;
601: for (resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList
602: .next()) {
603: if (resph.resp == resp) // new resp precedes any others
604: {
605: MarkedForClose = resph;
606:
607: if (DebugDemux)
608: System.err.println("Demux: stream "
609: + resp.inp_stream.hashCode()
610: + " marked for close ("
611: + Thread.currentThread() + ")");
612:
613: closeSocketIfAllStreamsClosed();
614: return;
615: }
616:
617: if (MarkedForClose == resph)
618: return; // already marked for closing after an earlier resp
619:
620: lasth = resph;
621: }
622:
623: if (lasth == null)
624: return;
625:
626: MarkedForClose = lasth; // resp == null, so use last resph
627: closeSocketIfAllStreamsClosed();
628:
629: if (DebugDemux)
630: System.err.println("Demux: stream "
631: + lasth.stream.hashCode() + " marked for close ("
632: + Thread.currentThread() + ")");
633: }
634:
635: /**
636: * Emergency stop. Closes the socket and notifies the responses that
637: * the requests are aborted.
638: *
639: * @since V0.3
640: */
641: void abort() {
642: if (DebugDemux)
643: System.err.println("Demux: Aborting socket ("
644: + this .hashCode() + ")");
645:
646: // notify all responses of abort
647:
648: synchronized (RespHandlerList) {
649: for (ResponseHandler resph = (ResponseHandler) RespHandlerList
650: .enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList
651: .next()) {
652: if (resph.resp.http_resp != null)
653: resph.resp.http_resp.markAborted();
654: if (resph.exception == null)
655: resph.exception = new IOException(
656: "Request aborted by user");
657: }
658:
659: /* Close the socket.
660: * Note: this duplicates most of close(IOException, boolean). We
661: * do *not* call close() because that is synchronized, but we want
662: * abort() to be asynch.
663: */
664: if (Sock != null) {
665: try {
666: try {
667: Sock.setSoLinger(false, 0);
668: } catch (Throwable t) {
669: }
670:
671: try {
672: Stream.close();
673: } catch (IOException ioe) {
674: }
675: try {
676: Sock.close();
677: } catch (IOException ioe) {
678: }
679: Sock = null;
680:
681: if (Timer != null) {
682: Timer.kill();
683: Timer = null;
684: }
685: } catch (NullPointerException npe) {
686: }
687:
688: Connection.DemuxList.remove(this );
689: }
690: }
691: }
692:
693: /**
694: * A safety net to close the connection.
695: */
696: protected void finalize() throws Throwable {
697: close((IOException) null, false);
698: super .finalize();
699: }
700:
701: /**
702: * produces a string.
703: * @return a string containing the class name and protocol number
704: */
705: public String toString() {
706: String prot;
707:
708: switch (Protocol) {
709: case HTTP:
710: prot = "HTTP";
711: break;
712: case HTTPS:
713: prot = "HTTPS";
714: break;
715: case SHTTP:
716: prot = "SHTTP";
717: break;
718: case HTTP_NG:
719: prot = "HTTP_NG";
720: break;
721: default:
722: throw new Error(
723: "HTTPClient Internal Error: invalid protocol "
724: + Protocol);
725: }
726:
727: return getClass().getName() + "[Protocol=" + prot + "]";
728: }
729: }
730:
731: /**
732: * This thread is used to implement socket timeouts. It keeps a list of
733: * timer entries and expries them after a given time.
734: */
735: class SocketTimeout extends Thread implements GlobalConstants {
736: /**
737: * This class represents a timer entry. It is used to close an
738: * inactive socket after n seconds. Once running, the timer may be
739: * suspended (hyber()), restarted (reset()), or aborted (kill()).
740: * When the timer expires it invokes markForClose() on the
741: * associated stream demultipexer.
742: */
743: class TimeoutEntry {
744: boolean restart = false, hyber = false, alive = true;
745: StreamDemultiplexor demux;
746: TimeoutEntry next = null, prev = null;
747:
748: TimeoutEntry(StreamDemultiplexor demux) {
749: this .demux = demux;
750: }
751:
752: void reset() {
753: hyber = false;
754: if (restart)
755: return;
756: restart = true;
757:
758: synchronized (time_list) {
759: if (!alive)
760: return;
761:
762: // remove from current position
763: next.prev = prev;
764: prev.next = next;
765:
766: // and add to end of timeout list
767: next = time_list[current];
768: prev = time_list[current].prev;
769: prev.next = this ;
770: next.prev = this ;
771: }
772: }
773:
774: void hyber() {
775: if (alive)
776: hyber = true;
777: }
778:
779: void kill() {
780: alive = false;
781: restart = false;
782: hyber = false;
783:
784: synchronized (time_list) {
785: if (prev == null)
786: return;
787: next.prev = prev;
788: prev.next = next;
789: prev = null;
790: }
791: }
792: }
793:
794: private TimeoutEntry[] time_list;
795: private int current;
796:
797: SocketTimeout(int secs) {
798: super ("SocketTimeout");
799:
800: try {
801: setDaemon(true);
802: } catch (SecurityException se) {
803: } // Oh well...
804: setPriority(MAX_PRIORITY);
805:
806: time_list = new TimeoutEntry[secs];
807: for (int idx = 0; idx < secs; idx++) {
808: time_list[idx] = new TimeoutEntry(null);
809: time_list[idx].next = time_list[idx].prev = time_list[idx];
810: }
811: current = 0;
812: }
813:
814: public TimeoutEntry setTimeout(StreamDemultiplexor demux) {
815: TimeoutEntry entry = new TimeoutEntry(demux);
816: synchronized (time_list) {
817: entry.next = time_list[current];
818: entry.prev = time_list[current].prev;
819: entry.prev.next = entry;
820: entry.next.prev = entry;
821: }
822:
823: return entry;
824: }
825:
826: /**
827: * This timer is implemented by sleeping for 1 second and then
828: * checking the timer list.
829: */
830: public void run() {
831: TimeoutEntry marked = null;
832:
833: while (true) {
834: try {
835: sleep(1000L);
836: } catch (InterruptedException ie) {
837: }
838:
839: synchronized (time_list) {
840: // reset all restart flags
841: for (TimeoutEntry entry = time_list[current].next; entry != time_list[current]; entry = entry.next) {
842: entry.restart = false;
843: }
844:
845: current++;
846: if (current >= time_list.length)
847: current = 0;
848:
849: // remove all expired timers
850: for (TimeoutEntry entry = time_list[current].next; entry != time_list[current]; entry = entry.next) {
851: if (entry.alive && !entry.hyber) {
852: TimeoutEntry prev = entry.prev;
853: entry.kill();
854: /* put on death row. Note: we must not invoke
855: * markForClose() here because it is synch'd
856: * and can therefore lead to a deadlock if that
857: * thread is trying to do a reset() or kill()
858: */
859: entry.next = marked;
860: marked = entry;
861: entry = prev;
862: }
863: }
864: }
865:
866: while (marked != null) {
867: marked.demux.markForClose(null);
868: marked = marked.next;
869: }
870: }
871: }
872: }
|