Source Code Cross Referenced for StreamDemultiplexor.java in  » Net » SkunkDAV » HTTPClient » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » SkunkDAV » HTTPClient 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * @(#)StreamDemultiplexor.java				0.3-2 18/06/1999
003:         *
004:         *  This file is part of the HTTPClient package
005:         *  Copyright (C) 1996-1999  Ronald Tschalär
006:         *
007:         *  This library is free software; you can redistribute it and/or
008:         *  modify it under the terms of the GNU Lesser General Public
009:         *  License as published by the Free Software Foundation; either
010:         *  version 2 of the License, or (at your option) any later version.
011:         *
012:         *  This library is distributed in the hope that it will be useful,
013:         *  but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
015:         *  Lesser General Public License for more details.
016:         *
017:         *  You should have received a copy of the GNU Lesser General Public
018:         *  License along with this library; if not, write to the Free
019:         *  Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
020:         *  MA 02111-1307, USA
021:         *
022:         *  For questions, suggestions, bug-reports, enhancement-requests etc.
023:         *  I may be contacted at:
024:         *
025:         *  ronald@innovation.ch
026:         *
027:         */
028:
029:        package HTTPClient;
030:
031:        import java.io.*;
032:        import java.net.Socket;
033:        import java.util.Vector;
034:        import java.util.Enumeration;
035:
036:        /**
037:         * This class handles the demultiplexing of input stream. This is needed
038:         * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
039:         *
040:         * @version	0.3-2  18/06/1999
041:         * @author	Ronald Tschalär
042:         */
043:
044:        class StreamDemultiplexor implements  GlobalConstants {
045:            /** the protocol were handling request for */
046:            private int Protocol;
047:
048:            /** the connection we're working for */
049:            private HTTPConnection Connection;
050:
051:            /** the input stream to demultiplex */
052:            private ExtBufferedInputStream Stream;
053:
054:            /** the socket this hangs off */
055:            private Socket Sock = null;
056:
057:            /** signals after the closing of which stream to close the socket */
058:            private ResponseHandler MarkedForClose;
059:
060:            /** timer used to close the socket if unused for a given time */
061:            private SocketTimeout.TimeoutEntry Timer = null;
062:
063:            /** timer thread which implements the timers */
064:            private static SocketTimeout TimerThread = null;
065:
066:            /** a Vector to hold the list of response handlers were serving */
067:            private LinkedList RespHandlerList;
068:
069:            /** number of unread bytes in current chunk (if transf-enc == chunked) */
070:            private int chunk_len;
071:
072:            /** the currently set timeout for the socket */
073:            private int cur_timeout = 0;
074:
075:            static {
076:                TimerThread = new SocketTimeout(60);
077:                TimerThread.start();
078:            }
079:
080:            // Constructors
081:
082:            /**
083:             * a simple contructor.
084:             *
085:             * @param protocol   the protocol used on this stream.
086:             * @param sock       the socket which we're to demux.
087:             * @param connection the http-connection this socket belongs to.
088:             */
089:            StreamDemultiplexor(int protocol, Socket sock,
090:                    HTTPConnection connection) throws IOException {
091:                this .Protocol = protocol;
092:                this .Connection = connection;
093:                RespHandlerList = new LinkedList();
094:                init(sock);
095:            }
096:
097:            /**
098:             * Initializes the demultiplexor with a new socket.
099:             *
100:             * @param stream   the stream to demultiplex
101:             */
102:            private void init(Socket sock) throws IOException {
103:                if (DebugDemux)
104:                    System.err
105:                            .println("Demux: Initializing Stream Demultiplexor ("
106:                                    + this .hashCode() + ")");
107:
108:                this .Sock = sock;
109:                this .Stream = new ExtBufferedInputStream(sock.getInputStream());
110:                MarkedForClose = null;
111:                chunk_len = -1;
112:
113:                // start a timer to close the socket after 60 seconds
114:                Timer = TimerThread.setTimeout(this );
115:            }
116:
117:            // Methods
118:
119:            /**
120:             * Each Response must register with us.
121:             */
122:            void register(Response resp_handler, Request req)
123:                    throws RetryException {
124:                synchronized (RespHandlerList) {
125:                    if (Sock == null)
126:                        throw new RetryException();
127:
128:                    RespHandlerList.addToEnd(new ResponseHandler(resp_handler,
129:                            req, this ));
130:                }
131:            }
132:
133:            /**
134:             * creates an input stream for the response.
135:             *
136:             * @param resp the response structure requesting the stream
137:             * @return an InputStream
138:             */
139:            RespInputStream getStream(Response resp) {
140:                ResponseHandler resph;
141:                for (resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList
142:                        .next()) {
143:                    if (resph.resp == resp)
144:                        break;
145:                }
146:
147:                if (resph != null)
148:                    return resph.stream;
149:                else
150:                    return null;
151:            }
152:
153:            /**
154:             * Restarts the timer thread that will close an unused socket after
155:             * 60 seconds.
156:             */
157:            void restartTimer() {
158:                if (Timer != null)
159:                    Timer.reset();
160:            }
161:
162:            /**
163:             * reads an array of bytes from the master stream.
164:             */
165:            int read(byte[] b, int off, int len, ResponseHandler resph,
166:                    int timeout) throws IOException {
167:                if (resph.exception != null)
168:                    throw (IOException) resph.exception.fillInStackTrace();
169:
170:                if (resph.eof)
171:                    return -1;
172:
173:                // read the headers and data for all responses preceding us.
174:
175:                ResponseHandler head;
176:                while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null
177:                        && head != resph) {
178:                    try {
179:                        head.stream.readAll(timeout);
180:                    } catch (IOException ioe) {
181:                        if (ioe instanceof  InterruptedIOException)
182:                            throw ioe;
183:                        else
184:                            throw (IOException) resph.exception
185:                                    .fillInStackTrace();
186:                    }
187:                }
188:
189:                // Now we can read from the stream.
190:
191:                synchronized (this ) {
192:                    if (resph.exception != null)
193:                        throw (IOException) resph.exception.fillInStackTrace();
194:
195:                    if (DebugDemux) {
196:                        if (resph.resp.cd_type != CD_HDRS)
197:                            System.err.println("Demux: Reading for stream "
198:                                    + resph.stream.hashCode() + " ("
199:                                    + Thread.currentThread() + ")");
200:                    }
201:
202:                    if (Timer != null)
203:                        Timer.hyber();
204:
205:                    try {
206:                        int rcvd = -1;
207:
208:                        if (timeout != cur_timeout) {
209:                            if (DebugDemux) {
210:                                System.err.println("Demux: Setting timeout to "
211:                                        + timeout + " ms");
212:                            }
213:
214:                            try {
215:                                Sock.setSoTimeout(timeout);
216:                            } catch (Throwable t) {
217:                            }
218:                            cur_timeout = timeout;
219:                        }
220:
221:                        switch (resph.resp.cd_type) {
222:                        case CD_HDRS:
223:                            rcvd = Stream.read(b, off, len);
224:                            if (rcvd == -1)
225:                                throw new EOFException(
226:                                        "Premature EOF encountered");
227:                            break;
228:
229:                        case CD_0:
230:                            rcvd = -1;
231:                            close(resph);
232:                            break;
233:
234:                        case CD_CLOSE:
235:                            rcvd = Stream.read(b, off, len);
236:                            if (rcvd == -1)
237:                                close(resph);
238:                            break;
239:
240:                        case CD_CONTLEN:
241:                            int cl = resph.resp.ContentLength;
242:                            if (len > cl - resph.stream.count)
243:                                len = cl - resph.stream.count;
244:
245:                            rcvd = Stream.read(b, off, len);
246:                            if (rcvd == -1)
247:                                throw new EOFException(
248:                                        "Premature EOF encountered");
249:
250:                            if (resph.stream.count + rcvd == cl)
251:                                close(resph);
252:
253:                            break;
254:
255:                        case CD_CHUNKED:
256:                            if (chunk_len == -1) // it's a new chunk
257:                                chunk_len = Codecs.getChunkLength(Stream);
258:
259:                            if (chunk_len > 0) // it's data
260:                            {
261:                                if (len > chunk_len)
262:                                    len = chunk_len;
263:                                rcvd = Stream.read(b, off, len);
264:                                if (rcvd == -1)
265:                                    throw new EOFException(
266:                                            "Premature EOF encountered");
267:                                chunk_len -= rcvd;
268:                                if (chunk_len == 0) // got the whole chunk
269:                                {
270:                                    Stream.read(); // CR
271:                                    Stream.read(); // LF
272:                                    chunk_len = -1;
273:                                }
274:                            } else // the footers (trailers)
275:                            {
276:                                resph.resp.readTrailers(Stream);
277:                                rcvd = -1;
278:                                close(resph);
279:                                chunk_len = -1;
280:                            }
281:                            break;
282:
283:                        case CD_MP_BR:
284:                            byte[] endbndry = resph.getEndBoundary(Stream);
285:                            int[] end_cmp = resph.getEndCompiled(Stream);
286:
287:                            rcvd = Stream.read(b, off, len);
288:                            if (rcvd == -1)
289:                                throw new EOFException(
290:                                        "Premature EOF encountered");
291:
292:                            int ovf = Stream.pastEnd(endbndry, end_cmp);
293:                            if (ovf != -1) {
294:                                rcvd -= ovf;
295:                                Stream.reset();
296:                                close(resph);
297:                            }
298:
299:                            break;
300:
301:                        default:
302:                            throw new Error(
303:                                    "Internal Error in StreamDemultiplexor: "
304:                                            + "Invalid cd_type "
305:                                            + resph.resp.cd_type);
306:                        }
307:
308:                        restartTimer();
309:                        return rcvd;
310:
311:                    } catch (InterruptedIOException ie) // don't intercept this one
312:                    {
313:                        restartTimer();
314:                        throw ie;
315:                    } catch (IOException ioe) {
316:                        if (DebugDemux) {
317:                            System.err.print("Demux: ("
318:                                    + Thread.currentThread() + ") ");
319:                            ioe.printStackTrace();
320:                        }
321:
322:                        close(ioe, true);
323:                        throw resph.exception; // set by retry_requests
324:                    } catch (ParseException pe) {
325:                        if (DebugDemux) {
326:                            System.err.print("Demux: ("
327:                                    + Thread.currentThread() + ") ");
328:                            pe.printStackTrace();
329:                        }
330:
331:                        close(new IOException(pe.toString()), true);
332:                        throw resph.exception; // set by retry_requests
333:                    }
334:                }
335:            }
336:
337:            /**
338:             * skips a number of bytes in the master stream. This is done via a
339:             * dummy read, as the socket input stream doesn't like skip()'s.
340:             */
341:            synchronized long skip(long num, ResponseHandler resph)
342:                    throws IOException {
343:                if (resph.exception != null)
344:                    throw (IOException) resph.exception.fillInStackTrace();
345:
346:                if (resph.eof)
347:                    return 0;
348:
349:                byte[] dummy = new byte[(int) num];
350:                int rcvd = read(dummy, 0, (int) num, resph, 0);
351:                if (rcvd == -1)
352:                    return 0;
353:                else
354:                    return rcvd;
355:            }
356:
357:            /**
358:             * Determines the number of available bytes.
359:             */
360:            synchronized int available(ResponseHandler resph)
361:                    throws IOException {
362:                int avail = Stream.available();
363:                if (resph == null)
364:                    return avail;
365:
366:                if (resph.exception != null)
367:                    throw (IOException) resph.exception.fillInStackTrace();
368:
369:                if (resph.eof)
370:                    return 0;
371:
372:                switch (resph.resp.cd_type) {
373:                case CD_0:
374:                    return 0;
375:                case CD_HDRS:
376:                    // this is something of a hack; I could return 0, but then
377:                    // if you were waiting for something on a response that
378:                    // wasn't first in line (and you didn't try to read the
379:                    // other response) you'd wait forever. On the other hand,
380:                    // we might be making a false promise here...
381:                    return (avail > 0 ? 1 : 0);
382:                case CD_CLOSE:
383:                    return avail;
384:                case CD_CONTLEN:
385:                    int cl = resph.resp.ContentLength;
386:                    cl -= resph.stream.count;
387:                    return (avail < cl ? avail : cl);
388:                case CD_CHUNKED:
389:                    return avail; // not perfect...
390:                case CD_MP_BR:
391:                    return avail; // not perfect...
392:                default:
393:                    throw new Error("Internal Error in StreamDemultiplexor: "
394:                            + "Invalid cd_type " + resph.resp.cd_type);
395:                }
396:
397:            }
398:
399:            /**
400:             * Closes the socket and all associated streams. If <var>exception</var>
401:             * is not null then all active requests are retried.
402:             *
403:             * <P>There are five ways this method may be activated. 1) if an exception
404:             * occurs during read or write. 2) if the stream is marked for close but
405:             * no responses are outstanding (e.g. due to a timeout). 3) when the
406:             * markedForClose response is closed. 4) if all response streams up until
407:             * and including the markedForClose response have been closed. 5) if this
408:             * demux is finalized.
409:             *
410:             * @param exception the IOException to be sent to the streams.
411:             * @param was_reset if true then the exception is due to a connection
412:             *                  reset; otherwise it means we generated the exception
413:             *                  ourselves and this is a "normal" close.
414:             */
415:            synchronized void close(IOException exception, boolean was_reset) {
416:                if (Sock == null) // already cleaned up
417:                    return;
418:
419:                if (DebugDemux)
420:                    System.err
421:                            .println("Demux: Closing all streams and socket ("
422:                                    + this .hashCode() + ")");
423:
424:                try {
425:                    Stream.close();
426:                } catch (IOException ioe) {
427:                }
428:                try {
429:                    Sock.close();
430:                } catch (IOException ioe) {
431:                }
432:                Sock = null;
433:
434:                if (Timer != null) {
435:                    Timer.kill();
436:                    Timer = null;
437:                }
438:
439:                Connection.DemuxList.remove(this );
440:
441:                // Here comes the tricky part: redo outstanding requests!
442:
443:                if (exception != null)
444:                    synchronized (RespHandlerList) {
445:                        retry_requests(exception, was_reset);
446:                    }
447:            }
448:
449:            /**
450:             * Retries outstanding requests. Well, actually the RetryModule does
451:             * that. Here we just throw a RetryException for each request so that
452:             * the RetryModule can catch and handle them.
453:             *
454:             * @param exception the exception that led to this call.
455:             * @param was_reset this flag is passed to the RetryException and is
456:             *                  used by the RetryModule to distinguish abnormal closes
457:             *                  from expected closes.
458:             */
459:            private void retry_requests(IOException exception, boolean was_reset) {
460:                RetryException first = null, prev = null;
461:                ResponseHandler resph = (ResponseHandler) RespHandlerList
462:                        .enumerate();
463:
464:                while (resph != null) {
465:                    /* if the application is already reading the data then the
466:                     * response has already been handled. In this case we must
467:                     * throw the real exception.
468:                     */
469:                    if (resph.resp.got_headers) {
470:                        resph.exception = exception;
471:                    } else {
472:                        RetryException tmp = new RetryException(exception
473:                                .getMessage());
474:                        if (first == null)
475:                            first = tmp;
476:
477:                        tmp.request = resph.request;
478:                        tmp.response = resph.resp;
479:                        tmp.exception = exception;
480:                        tmp.conn_reset = was_reset;
481:                        tmp.first = first;
482:                        tmp.addToListAfter(prev);
483:
484:                        prev = tmp;
485:                        resph.exception = tmp;
486:                    }
487:
488:                    RespHandlerList.remove(resph);
489:                    resph = (ResponseHandler) RespHandlerList.next();
490:                }
491:            }
492:
493:            /**
494:             * Closes the associated stream. If this one has been markedForClose then
495:             * the socket is closed; else closeSocketIfAllStreamsClosed is invoked.
496:             */
497:            synchronized void close(ResponseHandler resph) {
498:                if (resph != (ResponseHandler) RespHandlerList.getFirst())
499:                    return;
500:
501:                if (DebugDemux)
502:                    System.err.println("Demux: Closing stream "
503:                            + resph.stream.hashCode() + " ("
504:                            + Thread.currentThread() + ")");
505:
506:                resph.eof = true;
507:                RespHandlerList.remove(resph);
508:
509:                if (resph == MarkedForClose)
510:                    close(new IOException("Premature end of Keep-Alive"), false);
511:                else
512:                    closeSocketIfAllStreamsClosed();
513:            }
514:
515:            /**
516:             * Close the socket if all the streams have been closed.
517:             *
518:             * <P>When a stream reaches eof it is removed from the response handler
519:             * list, but when somebody close()'s the response stream it is just
520:             * marked as such. This means that all responses in the list have either
521:             * not been read at all or only partially read, but they might have been
522:             * close()'d meaning that nobody is interested in the data. So If all the
523:             * response streams up till and including the one markedForClose have
524:             * been close()'d then we can remove them from our list and close the
525:             * socket.
526:             *
527:             * <P>Note: if the response list is emtpy or if no response is
528:             * markedForClose then this method does nothing. Specifically it does
529:             * not close the socket. We only want to close the socket if we've been
530:             * told to do so.
531:             *
532:             * <P>Also note that there might still be responses in the list after
533:             * the markedForClose one. These are due to us having pipelined more
534:             * requests to the server than it's willing to serve on a single
535:             * connection. These requests will be retried if possible.
536:             */
537:            synchronized void closeSocketIfAllStreamsClosed() {
538:                synchronized (RespHandlerList) {
539:                    ResponseHandler resph = (ResponseHandler) RespHandlerList
540:                            .enumerate();
541:
542:                    while (resph != null && resph.stream.closed) {
543:                        if (resph == MarkedForClose) {
544:                            // remove all response handlers first
545:                            ResponseHandler tmp;
546:                            do {
547:                                tmp = (ResponseHandler) RespHandlerList
548:                                        .getFirst();
549:                                RespHandlerList.remove(tmp);
550:                            } while (tmp != resph);
551:
552:                            // close the socket
553:                            close(
554:                                    new IOException(
555:                                            "Premature end of Keep-Alive"),
556:                                    false);
557:                            return;
558:                        }
559:
560:                        resph = (ResponseHandler) RespHandlerList.next();
561:                    }
562:                }
563:            }
564:
565:            /**
566:             * returns the socket associated with this demux
567:             */
568:            synchronized Socket getSocket() {
569:                if (MarkedForClose != null)
570:                    return null;
571:
572:                if (Timer != null)
573:                    Timer.hyber();
574:                return Sock;
575:            }
576:
577:            /**
578:             * Mark this demux to not accept any more request and to close the
579:             * stream after this <var>resp</var>onse or all requests have been
580:             * processed, or close immediately if no requests are registered.
581:             *
582:             * @param response the Response after which the connection should
583:             *                 be closed.
584:             */
585:            synchronized void markForClose(Response resp) {
586:                synchronized (RespHandlerList) {
587:                    if (RespHandlerList.getFirst() == null) // no active request,
588:                    { // so close the socket
589:                        close(new IOException("Premature end of Keep-Alive"),
590:                                false);
591:                        return;
592:                    }
593:                }
594:
595:                if (Timer != null) {
596:                    Timer.kill();
597:                    Timer = null;
598:                }
599:
600:                ResponseHandler resph, lasth = null;
601:                for (resph = (ResponseHandler) RespHandlerList.enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList
602:                        .next()) {
603:                    if (resph.resp == resp) // new resp precedes any others
604:                    {
605:                        MarkedForClose = resph;
606:
607:                        if (DebugDemux)
608:                            System.err.println("Demux: stream "
609:                                    + resp.inp_stream.hashCode()
610:                                    + " marked for close ("
611:                                    + Thread.currentThread() + ")");
612:
613:                        closeSocketIfAllStreamsClosed();
614:                        return;
615:                    }
616:
617:                    if (MarkedForClose == resph)
618:                        return; // already marked for closing after an earlier resp
619:
620:                    lasth = resph;
621:                }
622:
623:                if (lasth == null)
624:                    return;
625:
626:                MarkedForClose = lasth; // resp == null, so use last resph
627:                closeSocketIfAllStreamsClosed();
628:
629:                if (DebugDemux)
630:                    System.err.println("Demux: stream "
631:                            + lasth.stream.hashCode() + " marked for close ("
632:                            + Thread.currentThread() + ")");
633:            }
634:
635:            /**
636:             * Emergency stop. Closes the socket and notifies the responses that
637:             * the requests are aborted.
638:             *
639:             * @since V0.3
640:             */
641:            void abort() {
642:                if (DebugDemux)
643:                    System.err.println("Demux: Aborting socket ("
644:                            + this .hashCode() + ")");
645:
646:                // notify all responses of abort
647:
648:                synchronized (RespHandlerList) {
649:                    for (ResponseHandler resph = (ResponseHandler) RespHandlerList
650:                            .enumerate(); resph != null; resph = (ResponseHandler) RespHandlerList
651:                            .next()) {
652:                        if (resph.resp.http_resp != null)
653:                            resph.resp.http_resp.markAborted();
654:                        if (resph.exception == null)
655:                            resph.exception = new IOException(
656:                                    "Request aborted by user");
657:                    }
658:
659:                    /* Close the socket.
660:                     * Note: this duplicates most of close(IOException, boolean). We
661:                     * do *not* call close() because that is synchronized, but we want
662:                     * abort() to be asynch.
663:                     */
664:                    if (Sock != null) {
665:                        try {
666:                            try {
667:                                Sock.setSoLinger(false, 0);
668:                            } catch (Throwable t) {
669:                            }
670:
671:                            try {
672:                                Stream.close();
673:                            } catch (IOException ioe) {
674:                            }
675:                            try {
676:                                Sock.close();
677:                            } catch (IOException ioe) {
678:                            }
679:                            Sock = null;
680:
681:                            if (Timer != null) {
682:                                Timer.kill();
683:                                Timer = null;
684:                            }
685:                        } catch (NullPointerException npe) {
686:                        }
687:
688:                        Connection.DemuxList.remove(this );
689:                    }
690:                }
691:            }
692:
693:            /**
694:             * A safety net to close the connection.
695:             */
696:            protected void finalize() throws Throwable {
697:                close((IOException) null, false);
698:                super .finalize();
699:            }
700:
701:            /**
702:             * produces a string.
703:             * @return a string containing the class name and protocol number
704:             */
705:            public String toString() {
706:                String prot;
707:
708:                switch (Protocol) {
709:                case HTTP:
710:                    prot = "HTTP";
711:                    break;
712:                case HTTPS:
713:                    prot = "HTTPS";
714:                    break;
715:                case SHTTP:
716:                    prot = "SHTTP";
717:                    break;
718:                case HTTP_NG:
719:                    prot = "HTTP_NG";
720:                    break;
721:                default:
722:                    throw new Error(
723:                            "HTTPClient Internal Error: invalid protocol "
724:                                    + Protocol);
725:                }
726:
727:                return getClass().getName() + "[Protocol=" + prot + "]";
728:            }
729:        }
730:
731:        /**
732:         * This thread is used to implement socket timeouts. It keeps a list of
733:         * timer entries and expries them after a given time.
734:         */
735:        class SocketTimeout extends Thread implements  GlobalConstants {
736:            /**
737:             * This class represents a timer entry. It is used to close an
738:             * inactive socket after n seconds. Once running, the timer may be
739:             * suspended (hyber()), restarted (reset()), or aborted (kill()).
740:             * When the timer expires it invokes markForClose() on the
741:             * associated stream demultipexer.
742:             */
743:            class TimeoutEntry {
744:                boolean restart = false, hyber = false, alive = true;
745:                StreamDemultiplexor demux;
746:                TimeoutEntry next = null, prev = null;
747:
748:                TimeoutEntry(StreamDemultiplexor demux) {
749:                    this .demux = demux;
750:                }
751:
752:                void reset() {
753:                    hyber = false;
754:                    if (restart)
755:                        return;
756:                    restart = true;
757:
758:                    synchronized (time_list) {
759:                        if (!alive)
760:                            return;
761:
762:                        // remove from current position
763:                        next.prev = prev;
764:                        prev.next = next;
765:
766:                        // and add to end of timeout list
767:                        next = time_list[current];
768:                        prev = time_list[current].prev;
769:                        prev.next = this ;
770:                        next.prev = this ;
771:                    }
772:                }
773:
774:                void hyber() {
775:                    if (alive)
776:                        hyber = true;
777:                }
778:
779:                void kill() {
780:                    alive = false;
781:                    restart = false;
782:                    hyber = false;
783:
784:                    synchronized (time_list) {
785:                        if (prev == null)
786:                            return;
787:                        next.prev = prev;
788:                        prev.next = next;
789:                        prev = null;
790:                    }
791:                }
792:            }
793:
794:            private TimeoutEntry[] time_list;
795:            private int current;
796:
797:            SocketTimeout(int secs) {
798:                super ("SocketTimeout");
799:
800:                try {
801:                    setDaemon(true);
802:                } catch (SecurityException se) {
803:                } // Oh well...
804:                setPriority(MAX_PRIORITY);
805:
806:                time_list = new TimeoutEntry[secs];
807:                for (int idx = 0; idx < secs; idx++) {
808:                    time_list[idx] = new TimeoutEntry(null);
809:                    time_list[idx].next = time_list[idx].prev = time_list[idx];
810:                }
811:                current = 0;
812:            }
813:
814:            public TimeoutEntry setTimeout(StreamDemultiplexor demux) {
815:                TimeoutEntry entry = new TimeoutEntry(demux);
816:                synchronized (time_list) {
817:                    entry.next = time_list[current];
818:                    entry.prev = time_list[current].prev;
819:                    entry.prev.next = entry;
820:                    entry.next.prev = entry;
821:                }
822:
823:                return entry;
824:            }
825:
826:            /**
827:             * This timer is implemented by sleeping for 1 second and then
828:             * checking the timer list.
829:             */
830:            public void run() {
831:                TimeoutEntry marked = null;
832:
833:                while (true) {
834:                    try {
835:                        sleep(1000L);
836:                    } catch (InterruptedException ie) {
837:                    }
838:
839:                    synchronized (time_list) {
840:                        // reset all restart flags
841:                        for (TimeoutEntry entry = time_list[current].next; entry != time_list[current]; entry = entry.next) {
842:                            entry.restart = false;
843:                        }
844:
845:                        current++;
846:                        if (current >= time_list.length)
847:                            current = 0;
848:
849:                        // remove all expired timers 
850:                        for (TimeoutEntry entry = time_list[current].next; entry != time_list[current]; entry = entry.next) {
851:                            if (entry.alive && !entry.hyber) {
852:                                TimeoutEntry prev = entry.prev;
853:                                entry.kill();
854:                                /* put on death row. Note: we must not invoke
855:                                 * markForClose() here because it is synch'd
856:                                 * and can therefore lead to a deadlock if that
857:                                 * thread is trying to do a reset() or kill()
858:                                 */
859:                                entry.next = marked;
860:                                marked = entry;
861:                                entry = prev;
862:                            }
863:                        }
864:                    }
865:
866:                    while (marked != null) {
867:                        marked.demux.markForClose(null);
868:                        marked = marked.next;
869:                    }
870:                }
871:            }
872:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.