001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.server.hmux;
031:
032: import com.caucho.log.Log;
033: import com.caucho.util.Alarm;
034: import com.caucho.vfs.*;
035:
036: import java.io.IOException;
037: import java.io.InputStream;
038: import java.io.OutputStream;
039: import java.net.ConnectException;
040: import java.net.Socket;
041: import java.net.SocketException;
042: import java.util.HashMap;
043: import java.util.Iterator;
044: import java.util.logging.Level;
045: import java.util.logging.Logger;
046:
047: /**
048: * Underlying stream handling HTTP requests.
049: */
050: class HmuxStream extends StreamImpl {
051: private static final Logger log = Log.open(HmuxStream.class);
052: // reserved headers that should not be passed to the HTTP server
053: private static HashMap<String, String> _reserved;
054:
055: private static final Object LOCK = new Object();
056:
057: // Saved keepalive stream for a new request.
058: private static HmuxStream _savedStream;
059: // Time the stream was saved
060: private static long _saveTime;
061:
062: private long _socketTimeout = 30000L;
063:
064: private boolean _isSSL;
065:
066: private Socket _s;
067: private InputStream _is;
068: private OutputStream _os;
069: private ReadStream _rs;
070: private WriteStream _ws;
071:
072: // The server's host name
073: private String _host;
074: // The server's port
075: private int _port;
076:
077: private String _virtualHost;
078:
079: // the method
080: private String _method;
081: // true for a HEAD stream
082: private boolean _isHead;
083: // true for a POST stream
084: private boolean _isPost;
085:
086: // buffer containing the POST data
087: private MemoryStream _tempStream;
088:
089: // true if keepalive is allowed
090: private boolean _isKeepalive = true;
091: // true after the request has been sent
092: private boolean _didGet;
093: // length of the current chunk, -1 on eof
094: private int _chunkLength;
095: // the request is done
096: private boolean _isRequestDone;
097:
098: private HashMap<String, Object> _attributes;
099:
100: // Used to read unread bytes on recycle.
101: private byte[] _tempBuffer;
102:
103: /**
104: * Create a new HTTP stream.
105: */
106: private HmuxStream(Path path, String host, int port, Socket s)
107: throws IOException {
108: _s = s;
109:
110: _host = host;
111: _port = port;
112:
113: _is = _s.getInputStream();
114: _os = _s.getOutputStream();
115:
116: _ws = VfsStream.openWrite(_os);
117: _rs = VfsStream.openRead(_is, _ws);
118:
119: _attributes = new HashMap<String, Object>();
120:
121: init(path);
122: }
123:
124: /**
125: * Opens a new HTTP stream for reading, i.e. a GET request.
126: *
127: * @param path the URL for the stream
128: *
129: * @return the opened stream
130: */
131: static HmuxStreamWrapper openRead(HmuxPath path) throws IOException {
132: HmuxStream stream = createStream(path);
133: stream._isPost = false;
134:
135: return new HmuxStreamWrapper(stream);
136: }
137:
138: /**
139: * Opens a new HTTP stream for reading and writing, i.e. a POST request.
140: *
141: * @param path the URL for the stream
142: *
143: * @return the opened stream
144: */
145: static HmuxStreamWrapper openReadWrite(HmuxPath path)
146: throws IOException {
147: HmuxStream stream = createStream(path);
148: stream._isPost = true;
149:
150: return new HmuxStreamWrapper(stream);
151: }
152:
153: /**
154: * Creates a new HTTP stream. If there is a saved connection to
155: * the same host, use it.
156: *
157: * @param path the URL for the stream
158: *
159: * @return the opened stream
160: */
161: static private HmuxStream createStream(HmuxPath path)
162: throws IOException {
163: String host = path.getHost();
164: int port = path.getPort();
165:
166: HmuxStream stream = null;
167: long streamTime = 0;
168: synchronized (LOCK) {
169: if (_savedStream != null
170: && host.equals(_savedStream.getHost())
171: && port == _savedStream.getPort()) {
172: stream = _savedStream;
173: streamTime = _saveTime;
174: _savedStream = null;
175: }
176: }
177:
178: if (stream == null) {
179: }
180: // if the stream is still valid, use it
181: else if (Alarm.getCurrentTime() < streamTime + 5000) {
182: stream.init(path);
183: return stream;
184: }
185: // if the stream has timed out, close it
186: else {
187: try {
188: stream._isKeepalive = false;
189: stream.close();
190: } catch (IOException e) {
191: log.log(Level.FINE, e.toString(), e);
192: }
193: }
194:
195: Socket s;
196:
197: try {
198: s = new Socket(host, port);
199: } catch (ConnectException e) {
200: throw new ConnectException(path.getURL() + ": "
201: + e.getMessage());
202: } catch (Exception e) {
203: throw new ConnectException(path.getURL() + ": "
204: + e.toString());
205: }
206:
207: int socketTimeout = 300 * 1000;
208:
209: try {
210: s.setSoTimeout(socketTimeout);
211: } catch (Exception e) {
212: }
213:
214: return new HmuxStream(path, host, port, s);
215: }
216:
217: /**
218: * Initializes the stream for the next request.
219: */
220: private void init(Path path) {
221: _isRequestDone = false;
222: _didGet = false;
223: _isPost = false;
224: _isHead = false;
225: _method = null;
226: _attributes.clear();
227:
228: setPath(path);
229:
230: if (path instanceof HmuxPath)
231: _virtualHost = ((HmuxPath) path).getVirtualHost();
232: }
233:
234: /**
235: * Set if this should be an SSL connection.
236: */
237: public void setSSL(boolean isSSL) {
238: _isSSL = isSSL;
239: }
240:
241: /**
242: * Set if this should be an SSL connection.
243: */
244: public boolean isSSL() {
245: return _isSSL;
246: }
247:
248: /**
249: * Sets the method
250: */
251: public void setMethod(String method) {
252: _method = method;
253: }
254:
255: /**
256: * Sets true if we're only interested in the head.
257: */
258: public void setHead(boolean isHead) {
259: _isHead = isHead;
260: }
261:
262: /**
263: * Returns the stream's host.
264: */
265: public String getHost() {
266: return _host;
267: }
268:
269: /**
270: * Returns the stream's port.
271: */
272: public int getPort() {
273: return _port;
274: }
275:
276: /**
277: * Returns a header from the response returned from the HTTP server.
278: *
279: * @param name name of the header
280: * @return the header value.
281: */
282: public Object getAttribute(String name) throws IOException {
283: if (!_didGet)
284: getConnInput();
285:
286: return _attributes.get(name.toLowerCase());
287: }
288:
289: /**
290: * Returns an iterator of the returned header names.
291: */
292: public Iterator getAttributeNames() throws IOException {
293: if (!_didGet)
294: getConnInput();
295:
296: return _attributes.keySet().iterator();
297: }
298:
299: /**
300: * Sets a header for the request.
301: */
302: public void setAttribute(String name, Object value) {
303: if (name.equals("method"))
304: setMethod((String) value);
305: else if (name.equals("socket-timeout")) {
306: if (value instanceof Integer) {
307: int socketTimeout = ((Integer) value).intValue();
308:
309: if (socketTimeout > 0) {
310: try {
311: if (_s != null)
312: _s.setSoTimeout(socketTimeout);
313: } catch (Exception e) {
314:
315: }
316: }
317: }
318: } else
319: _attributes.put(name.toLowerCase(), value);
320: }
321:
322: /**
323: * Remove a header for the request.
324: */
325: public void removeAttribute(String name) {
326: _attributes.remove(name.toLowerCase());
327: }
328:
329: /**
330: * Sets the timeout.
331: */
332: public void setSocketTimeout(long timeout) throws SocketException {
333: if (_s != null)
334: _s.setSoTimeout((int) timeout);
335: }
336:
337: /**
338: * The stream is always writable (?)
339: */
340: public boolean canWrite() {
341: return true;
342: }
343:
344: /**
345: * Writes a buffer to the underlying stream.
346: *
347: * @param buffer the byte array to write.
348: * @param offset the offset into the byte array.
349: * @param length the number of bytes to write.
350: * @param isEnd true when the write is flushing a close.
351: */
352: public void write(byte[] buf, int offset, int length, boolean isEnd)
353: throws IOException {
354: if (!_isPost)
355: return;
356:
357: if (_tempStream == null)
358: _tempStream = new MemoryStream();
359:
360: _tempStream.write(buf, offset, length, isEnd);
361: }
362:
363: /**
364: * The stream is readable.
365: */
366: public boolean canRead() {
367: return true;
368: }
369:
370: /**
371: * Read data from the connection. If the request hasn't yet been sent
372: * to the server, send it.
373: */
374: public int read(byte[] buf, int offset, int length)
375: throws IOException {
376: try {
377: return readInt(buf, offset, length);
378: } catch (IOException e) {
379: _isKeepalive = false;
380: throw e;
381: } catch (RuntimeException e) {
382: _isKeepalive = false;
383: throw e;
384: }
385: }
386:
387: /**
388: * Read data from the connection. If the request hasn't yet been sent
389: * to the server, send it.
390: */
391: public int readInt(byte[] buf, int offset, int length)
392: throws IOException {
393: if (!_didGet)
394: getConnInput();
395:
396: if (_isRequestDone)
397: return -1;
398:
399: try {
400: int len = length;
401:
402: if (_chunkLength == 0) {
403: if (!readData())
404: _chunkLength = -1;
405: }
406:
407: if (_chunkLength < 0)
408: return -1;
409:
410: if (_chunkLength < len)
411: len = _chunkLength;
412:
413: len = _rs.read(buf, offset, len);
414:
415: if (len < 0) {
416: } else
417: _chunkLength -= len;
418:
419: return len;
420: } catch (IOException e) {
421: _isKeepalive = false;
422: throw e;
423: } catch (RuntimeException e) {
424: _isKeepalive = false;
425: throw e;
426: }
427: }
428:
429: /**
430: * Sends the request and initializes the response.
431: */
432: private void getConnInput() throws IOException {
433: if (_didGet)
434: return;
435:
436: try {
437: getConnInputImpl();
438: } catch (IOException e) {
439: _isKeepalive = false;
440: throw e;
441: } catch (RuntimeException e) {
442: _isKeepalive = false;
443: throw e;
444: }
445: }
446:
447: /**
448: * Send the request to the server, wait for the response and parse
449: * the headers.
450: */
451: private void getConnInputImpl() throws IOException {
452: if (_didGet)
453: return;
454:
455: _didGet = true;
456:
457: _ws.write('C');
458: _ws.write(0);
459: _ws.write(0);
460:
461: if (_method != null) {
462: writeString(HmuxRequest.HMUX_METHOD, _method);
463: } else if (_isPost) {
464: writeString(HmuxRequest.HMUX_METHOD, "POST");
465: } else if (_isHead)
466: writeString(HmuxRequest.HMUX_METHOD, "HEAD");
467: else
468: writeString(HmuxRequest.HMUX_METHOD, "GET");
469:
470: if (_virtualHost != null)
471: writeString(HmuxRequest.HMUX_SERVER_NAME, _virtualHost);
472: else {
473: writeString(HmuxRequest.HMUX_SERVER_NAME, _path.getHost());
474: _ws.print(_path.getHost());
475: if (_path.getPort() != 80) {
476: writeString(HmuxRequest.CSE_SERVER_PORT, String
477: .valueOf(_path.getPort()));
478: }
479: }
480:
481: // Not splitting query? Also fullpath?
482: writeString(HmuxRequest.HMUX_URI, _path.getPath());
483:
484: if (_path.getQuery() != null)
485: writeString(HmuxRequest.CSE_QUERY_STRING, _path.getQuery());
486:
487: Iterator iter = getAttributeNames();
488: while (iter.hasNext()) {
489: String name = (String) iter.next();
490: if (_reserved.get(name.toLowerCase()) == null) {
491: writeString(HmuxRequest.HMUX_HEADER, name);
492: writeString(HmuxRequest.HMUX_STRING, getAttribute(name));
493: }
494: }
495:
496: if (_isPost) {
497: MemoryStream tempStream = _tempStream;
498: _tempStream = null;
499: if (tempStream != null) {
500: TempBuffer tb = TempBuffer.allocate();
501: byte[] buffer = tb.getBuffer();
502: int sublen;
503:
504: ReadStream postIn = tempStream.openReadAndSaveBuffer();
505:
506: while ((sublen = postIn.read(buffer, 0, buffer.length)) > 0) {
507: _ws.write('D');
508: _ws.write(sublen >> 8);
509: _ws.write(sublen);
510: _ws.write(buffer, 0, sublen);
511: }
512:
513: tempStream.destroy();
514:
515: TempBuffer.free(tb);
516: tb = null;
517: }
518: }
519:
520: _attributes.clear();
521:
522: _ws.write('Q');
523:
524: readData();
525:
526: if (_isHead)
527: _isRequestDone = true;
528: }
529:
530: private void writeString(int code, String string)
531: throws IOException {
532: WriteStream ws = _ws;
533:
534: ws.write((byte) code);
535: int len = string.length();
536: ws.write(len >> 8);
537: ws.write(len);
538: ws.print(string);
539: }
540:
541: private void writeString(int code, Object obj) throws IOException {
542: String string = String.valueOf(obj);
543:
544: WriteStream ws = _ws;
545:
546: ws.write((byte) code);
547: int len = string.length();
548: ws.write(len >> 8);
549: ws.write(len);
550: ws.print(string);
551: }
552:
553: /**
554: * Parse the headers returned from the server.
555: */
556: private boolean readData() throws IOException {
557: boolean isDebug = log.isLoggable(Level.FINE);
558:
559: int code;
560:
561: ReadStream is = _rs;
562:
563: while ((code = is.read()) > 0) {
564: switch (code) {
565: case HmuxRequest.HMUX_CHANNEL:
566: is.read();
567: is.read();
568: break;
569: case HmuxRequest.HMUX_QUIT:
570: case HmuxRequest.HMUX_EXIT:
571: is.close();
572:
573: if (isDebug)
574: log.fine("HMUX: " + (char) code);
575:
576: return false;
577:
578: case HmuxRequest.HMUX_YIELD:
579: break;
580:
581: case HmuxRequest.HMUX_STATUS:
582: String value = readString(is);
583: _attributes.put("status", value.substring(0, 3));
584:
585: if (isDebug)
586: log.fine("HMUX: " + (char) code + " " + value);
587: break;
588:
589: case HmuxRequest.HMUX_DATA:
590: _chunkLength = 256 * (is.read() & 0xff)
591: + (is.read() & 0xff);
592:
593: if (isDebug)
594: log.fine("HMUX: " + (char) code + " "
595: + _chunkLength);
596:
597: return true;
598:
599: default:
600: int len = 256 * (is.read() & 0xff) + (is.read() & 0xff);
601:
602: if (isDebug)
603: log.fine("HMUX: " + (char) code + " " + len);
604:
605: is.skip(len);
606: break;
607: }
608: }
609:
610: return false;
611: }
612:
613: private String readString(ReadStream is) throws IOException {
614: int len = 256 * (is.read() & 0xff) + is.read();
615:
616: char[] buf = new char[len];
617:
618: is.readAll(buf, 0, len);
619:
620: return new String(buf);
621: }
622:
623: /**
624: * Returns the bytes still available.
625: */
626: public int getAvailable() throws IOException {
627: if (!_didGet)
628: getConnInput();
629:
630: return _rs.getAvailable();
631: }
632:
633: /**
634: * Close the connection.
635: */
636: public void close() throws IOException {
637: if (_isKeepalive) {
638: // If recycling, read any unread data
639: if (!_didGet)
640: getConnInput();
641:
642: if (!_isRequestDone) {
643: if (_tempBuffer == null)
644: _tempBuffer = new byte[256];
645:
646: try {
647: while (read(_tempBuffer, 0, _tempBuffer.length) > 0) {
648: }
649: } catch (IOException e) {
650: _isKeepalive = false;
651: }
652: }
653: }
654:
655: if (com.caucho.server.util.CauchoSystem.isTesting())
656: _isKeepalive = false; // XXX:
657:
658: if (_isKeepalive) {
659: HmuxStream oldSaved;
660: long now = Alarm.getCurrentTime();
661: synchronized (LOCK) {
662: oldSaved = _savedStream;
663: _savedStream = this ;
664: _saveTime = now;
665: }
666:
667: if (oldSaved != null && oldSaved != this ) {
668: oldSaved._isKeepalive = false;
669: oldSaved.close();
670: }
671:
672: return;
673: }
674:
675: try {
676: try {
677: if (_ws != null)
678: _ws.close();
679: } catch (Throwable e) {
680: }
681: _ws = null;
682:
683: try {
684: if (_rs != null)
685: _rs.close();
686: } catch (Throwable e) {
687: }
688: _rs = null;
689:
690: try {
691: if (_os != null)
692: _os.close();
693: } catch (Throwable e) {
694: }
695: _os = null;
696:
697: try {
698: if (_is != null)
699: _is.close();
700: } catch (Throwable e) {
701: }
702: _is = null;
703: } finally {
704: if (_s != null)
705: _s.close();
706: _s = null;
707: }
708: }
709:
710: static {
711: _reserved = new HashMap<String, String>();
712: _reserved.put("user-agent", "");
713: _reserved.put("content-length", "");
714: _reserved.put("content-encoding", "");
715: _reserved.put("connection", "");
716: _reserved.put("host", "");
717: }
718: }
|