001: /*
002: * This file is part of the QuickServer library
003: * Copyright (C) QuickServer.org
004: *
005: * Use, modification, copying and distribution of this software is subject to
006: * the terms and conditions of the GNU Lesser General Public License.
007: * You should have received a copy of the GNU LGP License along with this
008: * library; if not, you can download a copy from <http://www.quickserver.org/>.
009: *
010: * For questions, suggestions, bug-reports, enhancement-requests etc.
011: * visit http://www.quickserver.org
012: *
013: */
014:
015: package org.quickserver.util.io;
016:
017: import java.io.*;
018: import java.nio.*;
019: import java.nio.charset.*;
020: import java.util.*;
021: import org.apache.commons.pool.ObjectPool;
022: import org.quickserver.net.server.ClientHandler;
023: import java.util.logging.*;
024: import org.quickserver.util.*;
025:
026: /**
027: * This is an InputStream constructed from list of ByteBuffers. This is
028: * used in non-blocking mode.
029: * @since 1.4.5
030: * @author Akshathkumar Shetty
031: */
032: public class ByteBufferInputStream extends InputStream {
033: private static Logger logger = Logger
034: .getLogger(ByteBufferInputStream.class.getName());
035: static {
036: logger.setLevel(Level.INFO);
037: }
038:
039: /**
040: * Sets the debug flag.
041: */
042: public static void setDebug(boolean flag) {
043: if (flag)
044: logger.setLevel(Level.FINEST);
045: else
046: logger.setLevel(Level.INFO);
047: }
048:
049: /**
050: * @since 1.4.7
051: */
052: public static boolean isLoggable(Level level) {
053: return logger.isLoggable(level);
054: }
055:
056: private ArrayList bufferList;
057: private ClientHandler handler;
058:
059: private CharsetDecoder decoder;
060: private CharsetEncoder encoder;
061: private StringBuffer strings;
062:
063: private int pos = 0;
064: private int index = -1;
065: private int start = 0;
066: private boolean lookingForLineFeed = false;
067:
068: public ByteBufferInputStream(ArrayList bufferList,
069: ClientHandler handler, String charset) {
070: if (bufferList == null || handler == null)
071: throw new IllegalArgumentException(
072: "ArrayList or ClientHandler was null.");
073: this .bufferList = bufferList;
074: this .handler = handler;
075: Charset _charset = Charset.forName(charset);
076: decoder = _charset.newDecoder();
077: encoder = _charset.newEncoder();
078: strings = new StringBuffer();
079: }
080:
081: public synchronized int availableOnlyInByteBuffer() {
082: int count = 0;
083: ByteBuffer byteBuffer = null;
084: int size = bufferList.size();
085: for (int c = 0; c < size; c++) {
086: byteBuffer = (ByteBuffer) bufferList.get(c);
087: count += byteBuffer.remaining();
088: }
089: logger.finest("count: " + count);
090: return count;
091: }
092:
093: public synchronized int available() {
094: int count = 0;
095: ByteBuffer byteBuffer = null;
096:
097: if (lookingForLineFeed) {
098: char c = '\0';
099: if (strings.length() != 0) {
100: c = strings.charAt(0);
101: if (c == '\n') {
102: strings.deleteCharAt(0);
103: lookingForLineFeed = false;
104: }
105: } else {
106: while (bufferList.size() != 0) {
107: byteBuffer = (ByteBuffer) bufferList.get(0);
108: if (byteBuffer.remaining() == 0) {
109: returnBufferBack();
110: continue;
111: }
112:
113: int p = byteBuffer.position();
114: c = (char) byteBuffer.get(p);
115: if (c == '\n') {
116: byteBuffer.get();//move position
117: lookingForLineFeed = false;
118: }
119: break;
120: }//end of while
121: }
122: }
123: count += strings.length();
124:
125: int size = bufferList.size();
126: for (int c = 0; c < size; c++) {
127: byteBuffer = (ByteBuffer) bufferList.get(c);
128: count += byteBuffer.remaining();
129: }
130: //logger.finest("count: "+count);
131: return count;
132: }
133:
134: public synchronized void close() throws IOException {
135: if (handler.getSocketChannel() != null)
136: handler.getSocketChannel().close();
137: //handler.closeConnection();
138: }
139:
140: public boolean markSupported() {
141: return false;
142: }
143:
144: public synchronized int read() throws IOException {
145: handler.isConnected();
146: if (strings.length() != 0) {
147: addStringsBackAsBuffer();
148: }
149:
150: if (bufferList.size() == 0) {
151: try {
152: wait();
153: } catch (InterruptedException ie) {
154: logger.warning("InterruptedException: " + ie);
155: return -1;
156: }
157: if (bufferList.size() == 0)
158: return -1;
159: }
160: ByteBuffer byteBuffer = null;
161: while (bufferList.size() != 0) {
162: byteBuffer = (ByteBuffer) bufferList.get(0);
163: if (byteBuffer.remaining() == 0) {
164: returnBufferBack();
165: continue;
166: }
167:
168: if (lookingForLineFeed) {
169: int lflfChar = (int) byteBuffer.get();
170: lookingForLineFeed = false;
171: if (lflfChar == (int) '\n') {
172: continue;
173: } else {
174: return lflfChar;
175: }
176: } else {
177: return (int) byteBuffer.get();
178: }
179: }
180: return read();
181: }
182:
183: public int read(byte[] b) throws IOException {
184: return read(b, 0, b.length);
185: }
186:
187: public synchronized int read(byte[] b, int off, int len)
188: throws IOException {
189: handler.isConnected();
190: if (strings.length() != 0) {
191: addStringsBackAsBuffer();
192: }
193:
194: if (bufferList.size() == 0) {
195: try {
196: wait();
197: } catch (InterruptedException ie) {
198: logger.warning("InterruptedException: " + ie);
199: //ie.printStackTrace();
200: return -1;
201: }
202: if (bufferList.size() == 0)
203: return -1;
204: }
205: ByteBuffer byteBuffer = null;
206: int read = 0;
207: int remaining = 0;
208: int toRead = len;
209: do {
210: byteBuffer = (ByteBuffer) bufferList.get(0);
211: remaining = byteBuffer.remaining();
212:
213: if (remaining == 0) {
214: returnBufferBack();
215: continue;
216: }
217:
218: if (lookingForLineFeed) {
219: int p = byteBuffer.position();
220: byte lflfChar = byteBuffer.get(p);
221: lookingForLineFeed = false;
222:
223: if (lflfChar == (byte) '\n') {
224: byteBuffer.get();//move position
225: continue;
226: }
227: }
228:
229: if (remaining < toRead) {
230: byteBuffer.get(b, off, remaining);
231: off = off + remaining;
232:
233: read = read + remaining;
234: toRead = toRead - remaining;
235: } else {
236: byteBuffer.get(b, off, toRead);
237: read = read + toRead;
238: return read;
239: }
240: } while (bufferList.size() != 0);
241: return read;
242: }
243:
244: public long skip(long n) throws IOException {
245: if (n < 0)
246: return 0;
247: int s = 0;
248: for (; s < n; s++) {
249: if (read() == -1)
250: break;
251: }
252: return s;
253: }
254:
255: private void addStringsBackAsBuffer() {
256: try {
257: ByteBuffer borrowBuffer = null;
258: ByteBuffer bb = encoder.encode(CharBuffer.wrap(strings));
259: strings.setLength(0);
260: do {
261: if (borrowBuffer == null) {
262: borrowBuffer = (ByteBuffer) handler.getServer()
263: .getByteBufferPool().borrowObject();
264: }
265:
266: borrowBuffer.put(bb.get());
267:
268: if (borrowBuffer.hasRemaining() == false) {
269: borrowBuffer.flip();
270: bufferList.add(0, borrowBuffer);
271: borrowBuffer = null;
272: }
273: } while (bb.hasRemaining());
274:
275: if (borrowBuffer != null) {
276: borrowBuffer.flip();
277: bufferList.add(0, borrowBuffer);
278: }
279: } catch (Exception er) {
280: logger.warning("Error : " + er);
281: }
282: start = 0;
283: index = -1;
284: pos = 0;
285: }
286:
287: private void returnBufferBack() {
288: returnBufferBack((ByteBuffer) bufferList.remove(0));
289: }
290:
291: private void returnBufferBack(ByteBuffer byteBuffer) {
292: try {
293: handler.getServer().getByteBufferPool().returnObject(
294: byteBuffer);
295: } catch (Exception er) {
296: logger.warning("Error while returning ByteBuffer to pool: "
297: + er);
298: }
299: }
300:
301: //-- extra helpers
302: /**
303: * Checks if a line of String is ready to be read.
304: * @throws IOException if connection is lost or closed.
305: */
306: public synchronized boolean isLineReady() throws IOException {
307: handler.isConnected();
308: boolean result = false;
309:
310: result = isLineReadyForStringBuffer();
311:
312: if (result == true || bufferList.size() == 0) {
313: if (logger.isLoggable(Level.FINEST))
314: logger.finest("result: " + result);
315: return result;
316: }
317:
318: ByteBuffer byteBuffer = null;
319: CharBuffer charBuffer = null;
320:
321: while (result == false && bufferList.size() != 0) {
322: byteBuffer = (ByteBuffer) bufferList.get(0);
323: if (byteBuffer.remaining() == 0) {
324: returnBufferBack();
325: continue;
326: }
327: charBuffer = decoder.decode(byteBuffer);
328: if (charBuffer == null) {
329: returnBufferBack();
330: continue;
331: }
332:
333: strings.append(charBuffer);
334: returnBufferBack();
335:
336: result = isLineReadyForStringBuffer();
337: }//end of while
338:
339: if (logger.isLoggable(Level.FINEST))
340: logger.finest("result: " + result);
341: return result;
342: }
343:
344: private boolean isLineReadyForStringBuffer() {
345: if (index != -1)
346: return true;
347:
348: int stringsLength = strings.length();
349:
350: while (pos < stringsLength) {
351: char c = strings.charAt(pos);
352:
353: if (c == '\n') {
354: if (lookingForLineFeed) {
355: strings.deleteCharAt(0);
356: stringsLength--;
357: lookingForLineFeed = false;
358: continue;
359: } else {
360: index = pos;
361: pos++;
362: return true;
363: }
364: }
365: if (c == '\r') {
366: index = pos;
367: lookingForLineFeed = true;
368: pos++;
369: return true;
370: } else {
371: pos++;
372: lookingForLineFeed = false;
373: }
374: }
375: return false;
376: }
377:
378: /**
379: * Reads a line of String if ready. If line is not yet ready this will
380: * block. To find out if the line is ready use <code>isLineReady()</code>
381: * @see #isLineReady()
382: */
383: public synchronized String readLine() throws IOException {
384: if (index == -1) {
385: while (isLineReady() == false) {
386: try {
387: wait();
388: } catch (InterruptedException ie) {
389: logger.warning("InterruptedException: " + ie);
390: return null;
391: }
392: }
393: }
394:
395: int stringsLength = strings.length();
396:
397: Assertion.affirm(index <= stringsLength);
398: String data = strings.substring(start, index);
399:
400: if (pos < stringsLength)
401: strings.delete(0, pos);
402: else
403: strings.setLength(0);
404:
405: start = 0;
406: pos = start;
407: index = -1;
408: return data;
409: }
410:
411: public void dumpContent() {
412: if (logger.isLoggable(Level.FINE) == false) {
413: logger
414: .warning("Can't precede. Logging level FINE is not luggable! ");
415: return;
416: }
417:
418: logger.fine("Start of dump..");
419: synchronized (bufferList) {
420: int size = bufferList.size();
421: ByteBuffer byteBuffer = null;
422: if (strings.length() != 0) {
423: logger.fine("[decoded] " + strings);
424: }
425: for (int c = 0; c < size; c++) {
426: byteBuffer = (ByteBuffer) bufferList.get(c);
427: try {
428: logger.fine("[" + c + "] "
429: + decoder.decode(byteBuffer.duplicate()));
430: } catch (Exception e) {
431: logger.fine("[" + c + "] Error : " + e);
432: }
433: }
434: }
435: logger.fine("End of dump..");
436: }
437: }
|