001: /*
002: * Copyright 2005-2007 Noelios Consulting.
003: *
004: * The contents of this file are subject to the terms of the Common Development
005: * and Distribution License (the "License"). You may not use this file except in
006: * compliance with the License.
007: *
008: * You can obtain a copy of the license at
009: * http://www.opensource.org/licenses/cddl1.txt See the License for the specific
010: * language governing permissions and limitations under the License.
011: *
012: * When distributing Covered Code, include this CDDL HEADER in each file and
013: * include the License file at http://www.opensource.org/licenses/cddl1.txt If
014: * applicable, add the following below this CDDL HEADER, with the fields
015: * enclosed by brackets "[]" replaced with your own identifying information:
016: * Portions Copyright [yyyy] [name of copyright owner]
017: */
018:
019: package org.restlet.util;
020:
021: import java.io.BufferedReader;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.InputStreamReader;
025: import java.io.OutputStream;
026: import java.nio.ByteBuffer;
027: import java.nio.channels.Channels;
028: import java.nio.channels.ClosedChannelException;
029: import java.nio.channels.Pipe;
030: import java.nio.channels.ReadableByteChannel;
031: import java.nio.channels.SelectableChannel;
032: import java.nio.channels.SelectionKey;
033: import java.nio.channels.Selector;
034: import java.nio.channels.WritableByteChannel;
035: import java.util.concurrent.ArrayBlockingQueue;
036: import java.util.concurrent.BlockingQueue;
037: import java.util.concurrent.TimeUnit;
038:
039: import org.restlet.data.CharacterSet;
040: import org.restlet.resource.Representation;
041:
042: /**
043: * Byte manipulation utilities.
044: *
045: * @author Jerome Louvel (contact@noelios.com)
046: */
047: public final class ByteUtils {
048: /**
049: * Returns a readable byte channel based on a given inputstream. If it is
050: * supported by a file a read-only instance of FileChannel is returned.
051: *
052: * @param inputStream
053: * The input stream to convert.
054: * @return A readable byte channel.
055: */
056: public static ReadableByteChannel getChannel(InputStream inputStream)
057: throws IOException {
058: return (inputStream != null) ? Channels.newChannel(inputStream)
059: : null;
060: }
061:
062: /**
063: * Returns a writable byte channel based on a given output stream.
064: *
065: * @param outputStream
066: * The output stream.
067: */
068: public static WritableByteChannel getChannel(
069: OutputStream outputStream) throws IOException {
070: return (outputStream != null) ? Channels
071: .newChannel(outputStream) : null;
072: }
073:
074: /**
075: * Returns a readable byte channel based on the given representation's
076: * content and its write(WritableByteChannel) method. Internally, it uses a
077: * writer thread and a pipe stream.
078: *
079: * @return A readable byte channel.
080: */
081: public static ReadableByteChannel getChannel(
082: final Representation representation) throws IOException {
083: final Pipe pipe = Pipe.open();
084:
085: // Creates a thread that will handle the task of continuously
086: // writing the representation into the input side of the pipe
087: Thread writer = new Thread() {
088: public void run() {
089: try {
090: WritableByteChannel wbc = pipe.sink();
091: representation.write(wbc);
092: wbc.close();
093: } catch (IOException ioe) {
094: ioe.printStackTrace();
095: }
096: }
097: };
098: writer.setDaemon(false);
099:
100: // Starts the writer thread
101: writer.start();
102: return pipe.source();
103: }
104:
105: /**
106: * Returns an input stream based on a given readable byte channel.
107: *
108: * @param readableChannel
109: * The readable byte channel.
110: * @return An input stream based on a given readable byte channel.
111: */
112: public static InputStream getStream(
113: ReadableByteChannel readableChannel) throws IOException {
114: return (readableChannel != null) ? Channels
115: .newInputStream(readableChannel) : null;
116: }
117:
118: /**
119: * Returns an input stream based on the given representation's content and
120: * its write(OutputStream) method. Internally, it uses a writer thread and a
121: * pipe stream.
122: *
123: * @return A stream with the representation's content.
124: */
125: public static InputStream getStream(
126: final Representation representation) throws IOException {
127: if (representation != null) {
128: final PipeStream pipe = new PipeStream();
129:
130: // Creates a thread that will handle the task of continuously
131: // writing the representation into the input side of the pipe
132: Thread writer = new Thread() {
133: public void run() {
134: try {
135: OutputStream os = pipe.getOutputStream();
136: representation.write(os);
137: os.write(-1);
138: os.close();
139: } catch (IOException ioe) {
140: ioe.printStackTrace();
141: }
142: }
143: };
144: writer.setDaemon(false);
145:
146: // Starts the writer thread
147: writer.start();
148: return pipe.getInputStream();
149: } else {
150: return null;
151: }
152: }
153:
154: /**
155: * Returns an output stream based on a given writable byte channel.
156: *
157: * @param writableChannel
158: * The writable byte channel.
159: * @return An output stream based on a given writable byte channel.
160: */
161: public static OutputStream getStream(
162: WritableByteChannel writableChannel) {
163: OutputStream result = null;
164:
165: if (writableChannel instanceof SelectableChannel) {
166: SelectableChannel selectableChannel = (SelectableChannel) writableChannel;
167:
168: synchronized (selectableChannel.blockingLock()) {
169: if (selectableChannel.isBlocking()) {
170: result = Channels.newOutputStream(writableChannel);
171: } else {
172: result = new NbChannelOutputStream(writableChannel);
173: }
174: }
175: } else {
176: result = new NbChannelOutputStream(writableChannel);
177: }
178:
179: return result;
180: }
181:
182: /**
183: * Converts an input stream to a string.<br/>As this method uses the
184: * InputstreamReader class, the default character set is used for decoding
185: * the input stream.
186: *
187: * @see <a
188: * href="http://java.sun.com/j2se/1.5.0/docs/api/java/io/InputStreamReader.html">InputStreamReader
189: * class</a>
190: * @see #toString(InputStream, CharacterSet)
191: * @param inputStream
192: * The input stream.
193: * @return The converted string.
194: */
195: public static String toString(InputStream inputStream) {
196: return toString(inputStream, null);
197: }
198:
199: /**
200: * Converts an input stream to a string using the specified character set
201: * for decoding the input stream.
202: *
203: * @see <a
204: * href="http://java.sun.com/j2se/1.5.0/docs/api/java/io/InputStreamReader.html">InputStreamReader
205: * class</a>
206: * @param inputStream
207: * The input stream.
208: * @param characterSet
209: * The character set
210: * @return The converted string.
211: */
212: public static String toString(InputStream inputStream,
213: CharacterSet characterSet) {
214: String result = null;
215:
216: if (inputStream != null) {
217: try {
218: StringBuilder sb = new StringBuilder();
219: InputStreamReader isr = null;
220: if (characterSet != null) {
221: isr = new InputStreamReader(inputStream,
222: characterSet.getName());
223: } else {
224: isr = new InputStreamReader(inputStream);
225: }
226: BufferedReader br = new BufferedReader(isr);
227: int nextByte = br.read();
228: while (nextByte != -1) {
229: sb.append((char) nextByte);
230: nextByte = br.read();
231: }
232: br.close();
233: result = sb.toString();
234: } catch (Exception e) {
235: // Returns an empty string
236: }
237: }
238:
239: return result;
240: }
241:
242: /**
243: * Writes an input stream to an output stream. When the reading is done, the
244: * input stream is closed.
245: *
246: * @param inputStream
247: * The input stream.
248: * @param outputStream
249: * The output stream.
250: * @throws IOException
251: */
252: public static void write(InputStream inputStream,
253: OutputStream outputStream) throws IOException {
254: int bytesRead;
255: byte[] buffer = new byte[2048];
256: while ((bytesRead = inputStream.read(buffer)) > 0) {
257: outputStream.write(buffer, 0, bytesRead);
258: }
259: inputStream.close();
260: }
261:
262: /**
263: * Writes a readable channel to a writable channel. It assumes that the
264: * readable and writable channels are both in NIO blocking mode.
265: *
266: * @param readableChannel
267: * The readable channel.
268: * @param writableChannel
269: * The writable channel.
270: * @throws IOException
271: */
272: public static void write(ReadableByteChannel readableChannel,
273: WritableByteChannel writableChannel) throws IOException {
274: if ((readableChannel != null) && (writableChannel != null)) {
275: write(Channels.newInputStream(readableChannel), Channels
276: .newOutputStream(writableChannel));
277: }
278: }
279:
280: /**
281: * Private constructor to ensure that the class acts as a true utility class
282: * i.e. it isn't instantiable and extensible.
283: */
284: private ByteUtils() {
285:
286: }
287:
288: /**
289: * Pipe stream that pipes output streams into input streams. Implementation
290: * based on a shared synchronized queue.
291: *
292: * @author Jerome Louvel (contact@noelios.com)
293: */
294: private final static class PipeStream {
295: private static final long QUEUE_TIMEOUT = 5;
296:
297: /** The supporting synchronized queue. */
298: private final BlockingQueue<Integer> queue;
299:
300: /** Constructor. */
301: public PipeStream() {
302: this .queue = new ArrayBlockingQueue<Integer>(1024);
303: }
304:
305: /**
306: * Returns a new input stream that can read from the pipe.
307: *
308: * @return A new input stream that can read from the pipe.
309: */
310: public InputStream getInputStream() {
311: return new InputStream() {
312: private boolean endReached = false;
313:
314: public int read() throws IOException {
315: try {
316: if (endReached)
317: return -1;
318:
319: Integer value = queue.poll(QUEUE_TIMEOUT,
320: TimeUnit.SECONDS);
321: if (value == null) {
322: throw new IOException(
323: "Timeout while reading from the queue-based input stream");
324: }
325:
326: endReached = (value.intValue() == -1);
327: return value;
328: } catch (InterruptedException ie) {
329: throw new IOException(
330: "Interruption occurred while writing in the queue");
331: }
332: }
333: };
334: }
335:
336: /**
337: * Returns a new output stream that can write into the pipe.
338: *
339: * @return A new output stream that can write into the pipe.
340: */
341: public OutputStream getOutputStream() {
342: return new OutputStream() {
343: public void write(int b) throws IOException {
344: try {
345: if (!queue.offer(b, QUEUE_TIMEOUT,
346: TimeUnit.SECONDS)) {
347: throw new IOException(
348: "Timeout while writing to the queue-based output stream");
349: }
350: } catch (InterruptedException ie) {
351: throw new IOException(
352: "Interruption occurred while writing in the queue");
353: }
354: }
355: };
356: }
357:
358: }
359:
360: /**
361: * Output stream connected to a non-blocking writable channel.
362: */
363: private final static class NbChannelOutputStream extends
364: OutputStream {
365: /** The channel to write to. */
366: private WritableByteChannel channel;
367:
368: private Selector selector;
369:
370: private SelectionKey selectionKey;
371:
372: private SelectableChannel selectableChannel;
373:
374: /**
375: * Constructor.
376: *
377: * @param channel
378: * The wrapped channel.
379: */
380: public NbChannelOutputStream(WritableByteChannel channel) {
381: this .channel = channel;
382:
383: if (!(channel instanceof SelectableChannel)) {
384: throw new IllegalArgumentException(
385: "Invalid channel provided. Please use only selectable channels.");
386: } else {
387: this .selectableChannel = (SelectableChannel) channel;
388: this .selector = null;
389: this .selectionKey = null;
390:
391: if (this .selectableChannel.isBlocking()) {
392: throw new IllegalArgumentException(
393: "Invalid blocking channel provided. Please use only non-blocking channels.");
394: }
395: }
396: }
397:
398: @Override
399: public void write(int b) throws IOException {
400: ByteBuffer bb = ByteBuffer.wrap(new byte[] { (byte) b });
401:
402: if ((this .channel != null) && (bb != null)) {
403: try {
404: int bytesWritten;
405:
406: while (bb.hasRemaining()) {
407: bytesWritten = this .channel.write(bb);
408:
409: if (bytesWritten < 0) {
410: throw new IOException(
411: "Unexpected negative number of bytes written.");
412: } else if (bytesWritten == 0) {
413: registerSelectionKey();
414:
415: if (getSelector().select(10000) == 0) {
416: throw new IOException(
417: "Unable to select the channel to write to it. Selection timed out.");
418: }
419: }
420: }
421: } catch (IOException ioe) {
422: throw new IOException(
423: "Unable to write to the non-blocking channel. "
424: + ioe.getLocalizedMessage());
425: }
426: } else {
427: throw new IOException(
428: "Unable to write. Null byte buffer or channel detected.");
429: }
430: }
431:
432: private Selector getSelector() throws IOException {
433: if (this .selector == null)
434: this .selector = Selector.open();
435:
436: return this .selector;
437: }
438:
439: private void registerSelectionKey()
440: throws ClosedChannelException, IOException {
441: this .selectionKey = this .selectableChannel.register(
442: getSelector(), SelectionKey.OP_WRITE);
443: }
444:
445: @Override
446: public void close() throws IOException {
447: if (this.selectionKey != null) {
448: this.selectionKey.cancel();
449: }
450:
451: if (this.selector != null)
452: this.selector.close();
453:
454: super.close();
455: }
456: }
457:
458: }
|