001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.io;
005:
006: import com.tc.bytes.TCByteBuffer;
007: import com.tc.bytes.TCByteBufferFactory;
008: import com.tc.util.Assert;
009:
010: import java.io.DataOutputStream;
011: import java.io.IOException;
012: import java.io.OutputStream;
013: import java.io.UTFDataFormatException;
014: import java.util.ArrayList;
015: import java.util.Collections;
016: import java.util.IdentityHashMap;
017: import java.util.Iterator;
018: import java.util.List;
019: import java.util.Map;
020:
021: /**
022: * Use me to write data to a set of TCByteBuffer instances. <br>
023: * <br>
024: * NOTE: This class never throws java.io.IOException (unlike the generic OutputStream) class
025: */
026: public class TCByteBufferOutputStream extends OutputStream implements
027: TCByteBufferOutput {
028:
029: private static final int DEFAULT_MAX_BLOCK_SIZE = 4096;
030: private static final int DEFAULT_INITIAL_BLOCK_SIZE = 32;
031:
032: private final boolean direct;
033: private final int maxBlockSize;
034: private final DataOutputStream dos;
035:
036: // The "buffers" list is accesed by index in the Mark class, thus it should not be a linked list
037: private List buffers = new ArrayList();
038: private Map localBuffers = new IdentityHashMap();
039: private TCByteBuffer current;
040: private boolean closed;
041: private int written;
042: private int blockSize;
043:
044: // TODO: Provide a method to write buffers to another output stream
045: // TODO: Provide a method to turn the buffers into an input stream with minimal cost (ie. no consolidation, no
046: // duplicate(), etc)
047:
048: public TCByteBufferOutputStream() {
049: this (DEFAULT_INITIAL_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE, false);
050: }
051:
052: public TCByteBufferOutputStream(int blockSize, boolean direct) {
053: this (blockSize, blockSize, false);
054: }
055:
056: public TCByteBufferOutputStream(int initialBlockSize,
057: int maxBlockSize, boolean direct) {
058: if (maxBlockSize < 1) {
059: throw new IllegalArgumentException(
060: "Max block size must be greater than or equal to 1");
061: }
062: if (initialBlockSize < 1) {
063: throw new IllegalArgumentException(
064: "Initial block size must be greater than or equal to 1");
065: }
066:
067: if (maxBlockSize < initialBlockSize) {
068: throw new IllegalArgumentException(
069: "Initial block size less than max block size");
070: }
071:
072: this .maxBlockSize = maxBlockSize;
073: this .blockSize = initialBlockSize;
074: this .direct = direct;
075: this .closed = false;
076: this .dos = new DataOutputStream(this );
077: addBuffer();
078: }
079:
080: /**
081: * Create a "mark" in this stream. A mark can be used to fixup data in an earlier portion of the stream even after you
082: * have written past it. One place this is useful is when you need to backtrack and fill in a length field after
083: * writing some arbitrary data to the stream
084: */
085: public Mark mark() {
086: checkClosed();
087: return new Mark(buffers.size(), current.position(),
088: getBytesWritten());
089: }
090:
091: public void write(int b) {
092: checkClosed();
093:
094: written++;
095:
096: if (!current.hasRemaining()) {
097: addBuffer();
098: }
099:
100: current.put((byte) b);
101: }
102:
103: public void write(byte b[]) {
104: write(b, 0, b.length);
105: }
106:
107: public void write(TCByteBuffer data) {
108: if (data == null) {
109: throw new NullPointerException();
110: }
111: write(new TCByteBuffer[] { data });
112: }
113:
114: /**
115: * Add arbitrary buffers into the stream. All of the data (from position 0 to limit()) in each buffer passed will be
116: * used in the stream. If that is not what you want, setup your buffers differently before calling this write()
117: */
118: public void write(TCByteBuffer[] data) {
119: checkClosed();
120: if (data == null) {
121: throw new NullPointerException();
122: }
123: if (data.length == 0) {
124: return;
125: }
126:
127: // deal with the current buffer
128: final boolean reuseCurrent = current.position() == 0;
129:
130: if (!reuseCurrent) {
131: // shrink and make it look like "full"
132: buffers.add(current.limit(current.position()).position(0));
133: }
134:
135: for (int i = 0, n = data.length; i < n; i++) {
136: int len = data[i].limit();
137: if (len == 0) {
138: continue;
139: }
140:
141: written += len;
142: buffers.add(data[i].duplicate().position(0));
143: }
144:
145: if (!reuseCurrent) {
146: current = (TCByteBuffer) buffers.remove(buffers.size() - 1);
147: current.position(current.limit());
148: }
149: }
150:
151: public int getBytesWritten() {
152: return written;
153: }
154:
155: public void write(byte b[], final int offset, final int length) {
156: checkClosed();
157:
158: if (b == null) {
159: throw new NullPointerException();
160: }
161:
162: if ((offset < 0) || (offset > b.length) || (length < 0)
163: || ((offset + length) > b.length)) {
164: throw new IndexOutOfBoundsException();
165: }
166:
167: if (length == 0) {
168: return;
169: }
170:
171: // do this after the checks (ie. don't corrupt the counter if bogus args passed)
172: written += length;
173:
174: int index = offset;
175: int numToWrite = length;
176: while (numToWrite > 0) {
177: if (!current.hasRemaining()) {
178: addBuffer();
179: }
180: final int numToPut = Math.min(current.remaining(),
181: numToWrite);
182: current.put(b, index, numToPut);
183: numToWrite -= numToPut;
184: index += numToPut;
185: }
186: }
187:
188: public void close() {
189: if (!closed) {
190: finalizeBuffers();
191: closed = true;
192: }
193: }
194:
195: /**
196: * Obtain the contents of this stream as an array of TCByteBuffer
197: */
198: public TCByteBuffer[] toArray() {
199: close();
200: TCByteBuffer[] rv = new TCByteBuffer[buffers.size()];
201: return (TCByteBuffer[]) buffers.toArray(rv);
202: }
203:
204: public String toString() {
205: return (buffers == null) ? "null" : buffers.toString();
206: }
207:
208: private void addBuffer() {
209: if (current != null) {
210: current.flip();
211: buffers.add(current);
212:
213: // use a buffer twice as big as the previous, at least until we hit the maximum block size allowed for this stream
214: if (blockSize < maxBlockSize) {
215: blockSize *= 2;
216:
217: if (blockSize > maxBlockSize) {
218: blockSize = maxBlockSize;
219: }
220: }
221: }
222:
223: current = TCByteBufferFactory.getInstance(direct, blockSize);
224: blockSize = current.capacity();
225: localBuffers.put(current, current);
226: }
227:
228: private void finalizeBuffers() {
229: if (current.position() > 0) {
230: current.flip();
231: buffers.add(current);
232: }
233:
234: current = null;
235:
236: List finalBufs = new ArrayList();
237: TCByteBuffer[] bufs = new TCByteBuffer[buffers.size()];
238: bufs = (TCByteBuffer[]) buffers.toArray(bufs);
239:
240: final int num = bufs.length;
241: int index = 0;
242:
243: // fixup "small" buffers consolidating them into buffers as close to maxBlockSize as possible
244: while (index < num) {
245: final int startIndex = index;
246: int size = bufs[startIndex].limit();
247:
248: if (size < maxBlockSize) {
249: while (index < (num - 1)) {
250: int nextSize = bufs[index + 1].limit();
251: if ((size + nextSize) <= maxBlockSize) {
252: size += nextSize;
253: index++;
254: } else {
255: break;
256: }
257: }
258: }
259:
260: if (index > startIndex) {
261: TCByteBuffer consolidated = TCByteBufferFactory
262: .getInstance(direct, size);
263: localBuffers.put(consolidated, consolidated);
264: final int end = index;
265: for (int i = startIndex; i <= end; i++) {
266: consolidated.put(bufs[i]);
267: if (localBuffers.remove(bufs[i]) != null) {
268: bufs[i].recycle();
269: }
270: }
271: Assert.assertEquals(size, consolidated.position());
272: consolidated.flip();
273: finalBufs.add(consolidated);
274: } else {
275: finalBufs.add(bufs[index]);
276: }
277:
278: index++;
279: }
280:
281: buffers = Collections.unmodifiableList(finalBufs);
282: }
283:
284: public final void writeBoolean(boolean value) {
285: try {
286: dos.writeBoolean(value);
287: } catch (IOException e) {
288: throw new AssertionError(e);
289: }
290: }
291:
292: public final void writeByte(int value) {
293: try {
294: dos.writeByte(value);
295: } catch (IOException e) {
296: throw new AssertionError(e);
297: }
298: }
299:
300: public final void writeChar(int value) {
301: try {
302: dos.writeChar(value);
303: } catch (IOException e) {
304: throw new AssertionError(e);
305: }
306: }
307:
308: public final void writeDouble(double value) {
309: try {
310: dos.writeDouble(value);
311: } catch (IOException e) {
312: throw new AssertionError(e);
313: }
314: }
315:
316: public final void writeFloat(float value) {
317: try {
318: dos.writeFloat(value);
319: } catch (IOException e) {
320: throw new AssertionError(e);
321: }
322: }
323:
324: public final void writeInt(int value) {
325: try {
326: dos.writeInt(value);
327: } catch (IOException e) {
328: throw new AssertionError(e);
329: }
330: }
331:
332: public final void writeLong(long value) {
333: try {
334: dos.writeLong(value);
335: } catch (IOException e) {
336: throw new AssertionError(e);
337: }
338: }
339:
340: public final void writeShort(int value) {
341: try {
342: dos.writeShort(value);
343: } catch (IOException e) {
344: throw new AssertionError(e);
345: }
346: }
347:
348: public final void writeString(String string) {
349: writeString(string, false);
350: }
351:
352: public final void writeString(String string, boolean forceRaw) {
353: // Is null? (true/false)
354: if (string == null) {
355: writeBoolean(true);
356: return;
357: } else {
358: writeBoolean(false);
359: }
360:
361: if (!forceRaw) {
362: Mark mark = mark();
363: // is UTF encoded? 1(true) or 0(false)
364: write(1);
365:
366: try {
367: dos.writeUTF(string);
368: // No exception, just return
369: return;
370: } catch (IOException e) {
371: if (!(e instanceof UTFDataFormatException)) {
372: throw new AssertionError(e);
373: }
374: // String too long, encode as raw chars
375: mark.write(0);
376: }
377: } else {
378: write(0);
379: }
380:
381: writeStringAsRawChars(string);
382: }
383:
384: private void writeStringAsRawChars(String string) {
385: if (string == null) {
386: throw new AssertionError();
387: }
388: writeInt(string.length());
389: try {
390: dos.writeChars(string);
391: } catch (IOException e) {
392: throw new AssertionError(e);
393: }
394: }
395:
396: private void checkClosed() {
397: if (closed) {
398: throw new IllegalStateException("stream is closed");
399: }
400: }
401:
402: // This class could be fancier:
403: // - Support the TCDataOutput interface
404: // - Allow writing through the mark to grow the buffer list
405: // - etc, etc, etc
406: public class Mark {
407: private final int bufferIndex;
408: private final int bufferPosition;
409: private final int absolutePosition;
410:
411: private Mark(int bufferIndex, int bufferPosition,
412: int absolutePosition) {
413: this .bufferIndex = bufferIndex;
414: this .bufferPosition = bufferPosition;
415: this .absolutePosition = absolutePosition;
416: }
417:
418: public int getPosition() {
419: return this .absolutePosition;
420: }
421:
422: /**
423: * Write the given byte array at the position designated by this mark
424: */
425: public void write(byte[] data) {
426: checkClosed();
427:
428: if (data == null) {
429: throw new NullPointerException();
430: }
431:
432: if (data.length == 0) {
433: return;
434: }
435:
436: if (getBytesWritten() - absolutePosition < data.length) {
437: throw new IllegalArgumentException(
438: "Cannot write past the existing tail of stream via the mark");
439: }
440:
441: TCByteBuffer buf = getBuffer(bufferIndex);
442:
443: int bufIndex = bufferIndex;
444: int bufPos = bufferPosition;
445: int dataIndex = 0;
446: int numToWrite = data.length;
447:
448: while (numToWrite > 0) {
449: int howMany = Math
450: .min(numToWrite, buf.limit() - bufPos);
451:
452: if (howMany > 0) {
453: buf.put(bufPos, data, dataIndex, howMany);
454: dataIndex += howMany;
455: numToWrite -= howMany;
456: if (numToWrite == 0) {
457: return;
458: }
459: }
460:
461: buf = getBuffer(++bufIndex);
462: bufPos = 0;
463: }
464: }
465:
466: private TCByteBuffer getBuffer(int index) {
467: if (index <= buffers.size() - 1) {
468: return (TCByteBuffer) buffers.get(index);
469: } else if (index == buffers.size()) {
470: return current;
471: } else {
472: throw Assert.failure("index=" + index
473: + ", buffers.size()=" + buffers.size());
474: }
475: }
476:
477: /**
478: * Write a single byte at the given mark. Calling write(int) multiple times will simply overwrite the same byte over
479: * and over
480: */
481: public void write(int b) {
482: write(new byte[] { (byte) b });
483: }
484: }
485:
486: public void recycle() {
487: if (localBuffers.size() > 0) {
488: for (Iterator i = localBuffers.keySet().iterator(); i
489: .hasNext();) {
490: TCByteBuffer buffer = (TCByteBuffer) i.next();
491: buffer.recycle();
492: }
493: }
494: }
495:
496: }
|