001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: * Free SoftwareFoundation, Inc.
023: * 59 Temple Place, Suite 330
024: * Boston, MA 02111-1307 USA
025: *
026: * @author Scott Ferguson
027: */
028:
029: package com.caucho.vfs;
030:
031: import java.io.IOException;
032: import java.io.InputStream;
033: import java.io.OutputStream;
034: import java.net.Socket;
035:
036: /**
037: * Specialized stream to handle sockets.
038: *
039: * <p>Unlike VfsStream, when the read() throws and IOException or
040: * a SocketException, SocketStream will throw a ClientDisconnectException.
041: */
042: public class SocketStream extends StreamImpl {
043: private static byte[] UNIX_NEWLINE = new byte[] { (byte) '\n' };
044:
045: private Socket _s;
046: private InputStream _is;
047: private OutputStream _os;
048: private boolean _needsFlush;
049: private byte[] _newline = UNIX_NEWLINE;
050:
051: private boolean _throwReadInterrupts = false;
052: private int _socketTimeout;
053:
054: private long _totalReadBytes;
055: private long _totalWriteBytes;
056:
057: public SocketStream() {
058: }
059:
060: public SocketStream(Socket s) {
061: init(s);
062: }
063:
064: /**
065: * Initialize the SocketStream with a new Socket.
066: *
067: * @param s the new socket.
068: */
069: public void init(Socket s) {
070: _s = s;
071:
072: _is = null;
073: _os = null;
074: _needsFlush = false;
075: }
076:
077: /**
078: * Initialize the SocketStream with a new Socket.
079: *
080: * @param s the new socket.
081: */
082: public void init(InputStream is, OutputStream os) {
083: _is = is;
084: _os = os;
085: _needsFlush = false;
086: }
087:
088: /**
089: * If true, throws read interrupts instead of returning an end of
090: * fail. Defaults to false.
091: */
092: public void setThrowReadInterrupts(boolean allowThrow) {
093: _throwReadInterrupts = allowThrow;
094: }
095:
096: /**
097: * If true, throws read interrupts instead of returning an end of
098: * fail. Defaults to false.
099: */
100: public boolean getThrowReadInterrupts() {
101: return _throwReadInterrupts;
102: }
103:
104: public void setNewline(byte[] newline) {
105: _newline = newline;
106: }
107:
108: public byte[] getNewline() {
109: return _newline;
110: }
111:
112: /**
113: * Returns true if stream is readable and bytes can be skipped.
114: */
115: public boolean hasSkip() {
116: return canRead();
117: }
118:
119: /**
120: * Skips bytes in the file.
121: *
122: * @param n the number of bytes to skip
123: *
124: * @return the actual bytes skipped.
125: */
126: public long skip(long n) throws IOException {
127: if (_is == null) {
128: if (_s == null)
129: return -1;
130:
131: _is = _s.getInputStream();
132: }
133:
134: return _is.skip(n);
135: }
136:
137: /**
138: * Returns true since the socket stream can be read.
139: */
140: public boolean canRead() {
141: return _is != null || _s != null;
142: }
143:
144: /**
145: * Reads bytes from the socket.
146: *
147: * @param buf byte buffer receiving the bytes
148: * @param offset offset into the buffer
149: * @param length number of bytes to read
150: * @return number of bytes read or -1
151: * @exception throws ClientDisconnectException if the connection is dropped
152: */
153: public int read(byte[] buf, int offset, int length)
154: throws IOException {
155: try {
156: if (_is == null) {
157: if (_s == null)
158: return -1;
159:
160: _is = _s.getInputStream();
161: }
162:
163: int readLength = _is.read(buf, offset, length);
164:
165: if (readLength >= 0)
166: _totalReadBytes += readLength;
167:
168: return readLength;
169: } catch (IOException e) {
170: if (_throwReadInterrupts)
171: throw e;
172:
173: // server/0611
174: /*
175: try {
176: close();
177: } catch (IOException e1) {
178: }
179: */
180:
181: return -1;
182: }
183: }
184:
185: /**
186: * Reads bytes from the socket.
187: *
188: * @param buf byte buffer receiving the bytes
189: * @param offset offset into the buffer
190: * @param length number of bytes to read
191: * @return number of bytes read or -1
192: * @exception throws ClientDisconnectException if the connection is dropped
193: */
194: public int readTimeout(byte[] buf, int offset, int length,
195: long timeout) throws IOException {
196: Socket s = _s;
197:
198: if (s == null)
199: return -1;
200:
201: int oldTimeout = s.getSoTimeout();
202: ;
203:
204: try {
205: s.setSoTimeout((int) timeout);
206:
207: return read(buf, offset, length);
208: } finally {
209: s.setSoTimeout(oldTimeout);
210: }
211: }
212:
213: /**
214: * Returns the number of bytes available to be read from the input stream.
215: */
216: public int getAvailable() throws IOException {
217: if (_is == null) {
218: if (_s == null)
219: return -1;
220:
221: _is = _s.getInputStream();
222: }
223:
224: return _is.available();
225: }
226:
227: public boolean canWrite() {
228: return _os != null || _s != null;
229: }
230:
231: /**
232: * Writes bytes to the socket.
233: *
234: * @param buf byte buffer containing the bytes
235: * @param offset offset into the buffer
236: * @param length number of bytes to read
237: * @param isEnd if the write is at a close.
238: *
239: * @exception throws ClientDisconnectException if the connection is dropped
240: */
241: public void write(byte[] buf, int offset, int length, boolean isEnd)
242: throws IOException {
243: if (_os == null) {
244: if (_s == null)
245: return;
246:
247: _os = _s.getOutputStream();
248: }
249:
250: try {
251: _needsFlush = true;
252: _os.write(buf, offset, length);
253: _totalWriteBytes += length;
254: } catch (IOException e) {
255: try {
256: close();
257: } catch (IOException e1) {
258: }
259:
260: throw ClientDisconnectException.create(e);
261: }
262: }
263:
264: /**
265: * Flushes the socket.
266: */
267: public void flush() throws IOException {
268: if (_os == null || !_needsFlush)
269: return;
270:
271: _needsFlush = false;
272: try {
273: _os.flush();
274: } catch (IOException e) {
275: try {
276: close();
277: } catch (IOException e1) {
278: }
279:
280: throw ClientDisconnectException.create(e);
281: }
282: }
283:
284: public void resetTotalBytes() {
285: _totalReadBytes = 0;
286: _totalWriteBytes = 0;
287: }
288:
289: public long getTotalReadBytes() {
290: return _totalReadBytes;
291: }
292:
293: public long getTotalWriteBytes() {
294: return _totalWriteBytes;
295: }
296:
297: /**
298: * Closes the write half of the stream.
299: */
300: public void closeWrite() throws IOException {
301: OutputStream os = _os;
302: _os = null;
303:
304: // since the output stream is opened lazily, we might
305: // need to open it
306: if (_s != null)
307: _s.shutdownOutput();
308: /*
309: if (os != null)
310: os.close();
311: */
312: }
313:
314: /**
315: * Closes the underlying sockets and socket streams.
316: */
317: public void close() throws IOException {
318: Socket s = _s;
319: _s = null;
320:
321: OutputStream os = _os;
322: _os = null;
323:
324: InputStream is = _is;
325: _is = null;
326:
327: try {
328: if (os != null)
329: os.close();
330:
331: if (is != null)
332: is.close();
333: } finally {
334: if (s != null)
335: s.close();
336: }
337: }
338: }
|