001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package java.nio.channels;
019:
020: import java.io.IOException;
021: import java.io.InputStream;
022: import java.io.OutputStream;
023: import java.io.Reader;
024: import java.io.Writer;
025: import java.nio.ByteBuffer;
026: import java.nio.CharBuffer;
027: import java.nio.channels.spi.AbstractInterruptibleChannel;
028: import java.nio.charset.Charset;
029: import java.nio.charset.CharsetDecoder;
030: import java.nio.charset.CharsetEncoder;
031: import org.apache.harmony.nio.internal.IOUtil;
032:
033: /**
034: * This class provides several utilities to get I/O streams from channels.
035: */
036: public final class Channels {
037:
038: /*
039: * Not intended to be instantiated.
040: */
041: private Channels() {
042: super ();
043: }
044:
045: /**
046: * Answers an input stream on the given channel
047: *
048: * @param channel
049: * The channel to be wrapped in an InputStream.
050: * @return an InputStream that takes bytes from the given byte channel.
051: */
052: public static InputStream newInputStream(ReadableByteChannel channel) {
053: return new ReadableByteChannelInputStream(channel);
054: }
055:
056: /**
057: * Answers an output stream on the given channel
058: *
059: * @param channel
060: * the channel to be wrapped in an OutputStream.
061: * @return an OutputStream that puts bytes onto the given byte channel.
062: */
063: public static OutputStream newOutputStream(
064: WritableByteChannel channel) {
065: return new WritableByteChannelOutputStream(channel);
066: }
067:
068: /**
069: * Answers a channel on the given input stream
070: *
071: * @param inputStream
072: * the stream to be wrapped in a byte channel.
073: * @return a byte channel that reads bytes from the input stream.
074: */
075: public static ReadableByteChannel newChannel(InputStream inputStream) {
076: return new ReadableByteChannelImpl(inputStream);
077: }
078:
079: /**
080: * Answers a channel on the given output stream
081: *
082: * @param outputStream
083: * the stream to be wrapped in a byte channel.
084: * @return a byte channel that writes bytes to the output stream.
085: */
086: public static WritableByteChannel newChannel(
087: OutputStream outputStream) {
088: return new WritableByteChannelImpl(outputStream);
089: }
090:
091: /**
092: * Answers a reader that decodes bytes from a channel.
093: *
094: * @param channel
095: * Channel to be read.
096: * @param decoder
097: * Charset decoder to be used.
098: * @param minBufferCapacity
099: * The minimum size of byte buffer, -1 means to use default size.
100: * @return The reader.
101: */
102: public static Reader newReader(ReadableByteChannel channel,
103: CharsetDecoder decoder, int minBufferCapacity) {
104: return new ByteChannelReader(new ReaderInputStream(channel),
105: decoder, minBufferCapacity);
106: }
107:
108: /**
109: * Answers a reader that decodes bytes from a channel.
110: *
111: * @param channel
112: * Channel to be read.
113: * @param charsetName
114: * Name of charset.
115: * @return The reader.
116: */
117: public static Reader newReader(ReadableByteChannel channel,
118: String charsetName) {
119: return newReader(channel, Charset.forName(charsetName)
120: .newDecoder(), -1);
121: }
122:
123: /**
124: * Answers a writer that encodes characters by encoder and output bytes to a
125: * channel.
126: *
127: * @param channel
128: * Channel to be written.
129: * @param encoder
130: * Charset decoder to be used.
131: * @param minBufferCapacity
132: * The minimum size of byte buffer, -1 means to use default size.
133: * @return The writer.
134: */
135: public static Writer newWriter(WritableByteChannel channel,
136: CharsetEncoder encoder, int minBufferCapacity) {
137: return new ByteChannelWriter(
138: new WritableByteChannelOutputStream(channel), encoder,
139: minBufferCapacity);
140: }
141:
142: /**
143: * Answers a writer that encodes characters by encoder and output bytes to a
144: * channel.
145: *
146: * @param channel
147: * Channel to be written.
148: * @param charsetName
149: * Name of charset.
150: * @return The writer.
151: */
152: public static Writer newWriter(WritableByteChannel channel,
153: String charsetName) {
154: return newWriter(channel, Charset.forName(charsetName)
155: .newEncoder(), -1);
156: }
157:
158: /*
159: * wrap a byte array to a ByteBuffer
160: */
161: static ByteBuffer wrapByteBuffer(byte[] bytes, int offset,
162: int length) {
163: ByteBuffer buffer = ByteBuffer.wrap(bytes);
164: int newLimit = offset + length <= buffer.capacity() ? offset
165: + length : buffer.capacity();
166: buffer.limit(newLimit);
167: buffer.position(offset);
168: return buffer;
169: }
170:
171: private static class ChannelInputStream extends InputStream {
172:
173: protected ReadableByteChannel channel;
174:
175: public ChannelInputStream(ReadableByteChannel aChannel) {
176: super ();
177: channel = aChannel;
178: }
179:
180: /**
181: * @see java.io.InputStream#read()
182: */
183: @Override
184: public synchronized int read() throws IOException {
185: byte[] oneByte = new byte[1];
186: int n = read(oneByte);
187: if (n == 1) {
188: // reads a single byte 0-255
189: return oneByte[0] & 0xff;
190: }
191: return -1;
192: }
193:
194: /**
195: * @see java.io.InputStream#close()
196: */
197: @Override
198: public synchronized void close() throws IOException {
199: channel.close();
200: }
201: }
202:
203: /*
204: * Wrapper class used for newInputStream(ReadableByteChannel channel)
205: */
206: private static class ReadableByteChannelInputStream extends
207: ChannelInputStream {
208:
209: public ReadableByteChannelInputStream(
210: ReadableByteChannel aChannel) {
211: super (aChannel);
212: }
213:
214: /**
215: * @see java.io.InputStream#read(byte[], int, int)
216: */
217: @Override
218: public synchronized int read(byte[] target, int offset,
219: int length) throws IOException {
220: // avoid int overflow, check null target
221: if (length + offset > target.length || length < 0
222: || offset < 0) {
223: throw new ArrayIndexOutOfBoundsException();
224: }
225: if (0 == length) {
226: return 0;
227: }
228: if (channel instanceof SelectableChannel) {
229: if (!((SelectableChannel) channel).isBlocking()) {
230: throw new IllegalBlockingModeException();
231: }
232: }
233: ByteBuffer buffer = ByteBuffer.wrap(target, offset, length);
234: return channel.read(buffer);
235: }
236: }
237:
238: /*
239: * Wrapper class used for newReader(ReadableByteChannel channel,
240: * CharsetDecoder decoder, int minBufferCapacity)
241: */
242: private static class ReaderInputStream extends ChannelInputStream {
243:
244: public ReaderInputStream(ReadableByteChannel aChannel) {
245: super (aChannel);
246: }
247:
248: /**
249: * @see java.io.InputStream#read(byte[], int, int)
250: */
251: @Override
252: public synchronized int read(byte[] target, int offset,
253: int length) throws IOException {
254: // avoid int overflow, check null target
255: if (length + offset > target.length || length < 0
256: || offset < 0) {
257: throw new ArrayIndexOutOfBoundsException();
258: }
259: if (0 == length) {
260: return 0;
261: }
262: ByteBuffer buffer = ByteBuffer.wrap(target, offset, length);
263: return channel.read(buffer);
264: }
265: }
266:
267: /*
268: * Wrapper class used for newOutputStream(WritableByteChannel channel)
269: */
270: private static class WritableByteChannelOutputStream extends
271: OutputStream {
272:
273: private WritableByteChannel channel;
274:
275: public WritableByteChannelOutputStream(
276: WritableByteChannel aChannel) {
277: super ();
278: channel = aChannel;
279: }
280:
281: /**
282: * @see java.io.OutputStream#write(int)
283: */
284: @Override
285: public synchronized void write(int oneByte) throws IOException {
286: byte[] wrappedByte = new byte[1];
287: wrappedByte[0] = (byte) oneByte;
288: write(wrappedByte);
289: }
290:
291: /**
292: * @see java.io.OutputStream#write(byte[], int, int)
293: */
294: @Override
295: public synchronized void write(byte[] source, int offset,
296: int length) throws IOException {
297: // avoid int overflow, check null source
298: if (length + offset > source.length || length < 0
299: || offset < 0) {
300: throw new ArrayIndexOutOfBoundsException();
301: }
302: if (0 == length) {
303: return;
304: }
305: if (channel instanceof SelectableChannel) {
306: if (!((SelectableChannel) channel).isBlocking()) {
307: throw new IllegalBlockingModeException();
308: }
309: }
310: ByteBuffer buffer = ByteBuffer.wrap(source, offset, length);
311: channel.write(buffer);
312: }
313:
314: /**
315: * @see java.io.OutputStream#close()
316: */
317: @Override
318: public synchronized void close() throws IOException {
319: channel.close();
320: }
321: }
322:
323: /*
324: * Wrapper class used for newChannel(InputStream inputStream)
325: */
326: private static class ReadableByteChannelImpl extends
327: AbstractInterruptibleChannel implements ReadableByteChannel {
328: private InputStream inputStream;
329:
330: ReadableByteChannelImpl(InputStream aInputStream) {
331: super ();
332: inputStream = aInputStream;
333: }
334:
335: /**
336: * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
337: */
338: public synchronized int read(ByteBuffer target)
339: throws IOException {
340: if (!isOpen()) {
341: throw new ClosedChannelException();
342: }
343: int bytesRemain = target.remaining();
344: byte[] bytes = new byte[bytesRemain];
345: int readCount = 0;
346: try {
347: begin();
348: readCount = inputStream.read(bytes);
349: } finally {
350: end(readCount >= 0);
351: }
352: if (readCount > 0) {
353: target.put(bytes, 0, readCount);
354: }
355: return readCount;
356: }
357:
358: /**
359: * @see java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel()
360: */
361: @Override
362: protected void implCloseChannel() throws IOException {
363: inputStream.close();
364: }
365: }
366:
367: /*
368: * Wrapper class used for newChannel(OutputStream outputStream)
369: */
370: private static class WritableByteChannelImpl extends
371: AbstractInterruptibleChannel implements WritableByteChannel {
372: private OutputStream outputStream;
373:
374: WritableByteChannelImpl(OutputStream aOutputStream) {
375: super ();
376: outputStream = aOutputStream;
377: }
378:
379: /**
380: * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
381: */
382: public synchronized int write(ByteBuffer source)
383: throws IOException {
384: if (!isOpen()) {
385: throw new ClosedChannelException();
386: }
387: int bytesRemain = source.remaining();
388: if (bytesRemain == 0) {
389: return 0;
390: }
391: byte[] buf = new byte[bytesRemain];
392: source.get(buf);
393: try {
394: begin();
395: outputStream.write(buf, 0, bytesRemain);
396: } finally {
397: end(bytesRemain >= 0);
398: }
399: return bytesRemain;
400: }
401:
402: /**
403: * @see java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel()
404: */
405: @Override
406: protected void implCloseChannel() throws IOException {
407: outputStream.close();
408: }
409: }
410:
411: /*
412: * Wrapper class used for newReader(ReadableByteChannel channel,
413: * CharsetDecoder decoder, int minBufferCapacity)
414: */
415: private static class ByteChannelReader extends Reader {
416:
417: private InputStream inputStream;
418:
419: private static final int BUFFER_SIZE = 8192;
420:
421: CharsetDecoder decoder;
422:
423: ByteBuffer bytes;
424:
425: CharBuffer chars;
426:
427: public ByteChannelReader(InputStream aInputStream,
428: CharsetDecoder aDecoder, int minBufferCapacity) {
429: super (aInputStream);
430: aDecoder.reset();
431: inputStream = aInputStream;
432: int bufferSize = Math.max(minBufferCapacity, BUFFER_SIZE);
433: bytes = ByteBuffer.allocate(bufferSize);
434: chars = CharBuffer.allocate(bufferSize);
435: decoder = aDecoder;
436: chars.limit(0);
437: }
438:
439: /**
440: * @see java.io.Reader#close()
441: */
442: @Override
443: public void close() throws IOException {
444: synchronized (lock) {
445: decoder = null;
446: if (inputStream != null) {
447: inputStream.close();
448: inputStream = null;
449: }
450: }
451: }
452:
453: /**
454: * @see java.io.Reader#ready()
455: */
456: @Override
457: public boolean ready() {
458: synchronized (lock) {
459: if (null == inputStream) {
460: return false;
461: }
462: try {
463: return chars.limit() > chars.position()
464: || inputStream.available() > 0;
465: } catch (IOException e) {
466: return false;
467: }
468: }
469: }
470:
471: /**
472: * @see java.io.Reader#read()
473: */
474: @Override
475: public int read() throws IOException {
476: return IOUtil.readInputStreamReader(inputStream, bytes,
477: chars, decoder, lock);
478: }
479:
480: /**
481: * @see java.io.Reader#read(char[], int, int)
482: */
483: @Override
484: public int read(char[] buf, int offset, int length)
485: throws IOException {
486: return IOUtil.readInputStreamReader(buf, offset, length,
487: inputStream, bytes, chars, decoder, lock);
488: }
489: }
490:
491: /*
492: * Wrapper class used for newWriter(WritableByteChannel channel,
493: * CharsetEncoder encoder, int minBufferCapacity)
494: */
495: private static class ByteChannelWriter extends Writer {
496:
497: private static final int BUFFER_SIZE = 8192;
498:
499: private OutputStream outputStream;
500:
501: private CharsetEncoder encoder;
502:
503: private ByteBuffer byteBuf;
504:
505: public ByteChannelWriter(OutputStream aOutputStream,
506: CharsetEncoder aEncoder, int minBufferCap) {
507: super (aOutputStream);
508: aEncoder.charset();
509: outputStream = aOutputStream;
510: byteBuf = ByteBuffer.allocate(Math.max(minBufferCap,
511: BUFFER_SIZE));
512: encoder = aEncoder;
513: }
514:
515: /**
516: * @see java.io.Writer#close()
517: */
518: @Override
519: public void close() throws IOException {
520: synchronized (lock) {
521: if (encoder != null) {
522: flush();
523: outputStream.flush();
524: outputStream.close();
525: encoder = null;
526: byteBuf = null;
527: }
528: }
529: }
530:
531: /**
532: * @see java.io.Writer#flush()
533: */
534: @Override
535: public void flush() throws IOException {
536: IOUtil.flushOutputStreamWriter(outputStream, byteBuf,
537: encoder, lock);
538: }
539:
540: /**
541: * @see java.io.Writer#write(char[], int, int)
542: */
543: @Override
544: public void write(char[] buf, int offset, int count)
545: throws IOException {
546: IOUtil.writeOutputStreamWriter(buf, offset, count,
547: outputStream, byteBuf, encoder, lock);
548: }
549:
550: /**
551: * @see java.io.Writer#write(int)
552: */
553: @Override
554: public void write(int oneChar) throws IOException {
555: IOUtil.writeOutputStreamWriter(oneChar, outputStream,
556: byteBuf, encoder, lock);
557: }
558:
559: /**
560: * @see java.io.Writer#write(java.lang.String, int, int)
561: */
562: @Override
563: public void write(String str, int offset, int count)
564: throws IOException {
565: IOUtil.writeOutputStreamWriter(str, offset, count,
566: outputStream, byteBuf, encoder, lock);
567: }
568: }
569: }
|