001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.web.micro.mts;
028:
029: import java.io.ByteArrayOutputStream;
030: import java.io.InputStream;
031: import java.io.IOException;
032: import java.util.Collections;
033: import java.util.LinkedList;
034: import java.util.List;
035: import java.util.Map;
036:
037: import org.cougaar.lib.web.micro.base.AnnotatedInputStream;
038:
039: /**
040: * An input stream pipe.
041: * <p>
042: * One thread calls "deliver" to add to the pipe, and another thread
043: * reads from the "input_stream".
044: */
045: public class InputPipe implements Deliverer {
046:
047: // a list of buffered byte[]s and Tokens
048: private final LinkedList queue = new LinkedList();
049:
050: private final InputStreamImpl in;
051:
052: private Map metaData;
053: private int counter = -1;
054: private boolean in_closed;
055: private boolean out_closed;
056:
057: public InputPipe() {
058: in = new InputStreamImpl();
059: }
060:
061: /** Non-blocking deliver call */
062: public void deliver(int counter, Map metaData, List data) {
063: synchronized (queue) {
064: // check if closed
065: if (in_closed) {
066: // our local client closed the input stream (?)
067: return;
068: }
069:
070: // update counter
071: if (counter != this .counter + 1) {
072: throw new RuntimeException("Unexpected counter value "
073: + counter + ", expecting " + (this .counter + 1));
074: }
075: this .counter = counter;
076:
077: if (counter == 0) {
078: // save metadata
079: this .metaData = (metaData == null ? Collections.EMPTY_MAP
080: : metaData);
081: }
082:
083: // append to queue
084: queue.addAll(data);
085: queue.notifyAll();
086: }
087: }
088:
089: public Map getMetaData() {
090: synchronized (queue) {
091: while (metaData == null && (!in_closed && !out_closed)) {
092: try {
093: // TODO have max-wait limit for stream timeout
094: queue.wait();
095: } catch (InterruptedException ie) {
096: throw new RuntimeException("interrupted");
097: }
098: }
099: return metaData;
100: }
101: }
102:
103: public AnnotatedInputStream getInputStream() {
104: return in;
105: }
106:
107: public void close() {
108: in.close();
109: }
110:
111: private class InputStreamImpl extends AnnotatedInputStream {
112: // our current byte[], removed from the head of the list
113: private byte[] buf = null;
114: private int offset = 0;
115:
116: // for "int read2()"
117: private byte[] tmp = new byte[1];
118:
119: public int read2() {
120: synchronized (queue) {
121: while (true) {
122: int count = _read2(tmp, 0, 1);
123: switch (count) {
124: case 1:
125: return tmp[0];
126: case 0:
127: break;
128: case -1:
129: return -1;
130: case NOOP:
131: return NOOP;
132: case FLUSH:
133: return FLUSH;
134: default:
135: throw new RuntimeException(
136: "Invalid read count: " + count);
137: }
138: }
139: }
140: }
141:
142: public int read2(byte b[]) {
143: return read2(b, 0, b.length);
144: }
145:
146: public int read2(byte[] b, int off, int len) {
147: if (b == null) {
148: throw new NullPointerException();
149: } else if ((off < 0) || (off > b.length) || (len < 0)
150: || ((off + len) > b.length) || ((off + len) < 0)) {
151: throw new IndexOutOfBoundsException();
152: } else if (len == 0) {
153: return 0;
154: }
155: synchronized (queue) {
156: return _read2(b, off, len);
157: }
158: }
159:
160: private int _read2(byte[] b, int off, int len) {
161: assert Thread.holdsLock(queue);
162:
163: if (buf == null) {
164: while (true) {
165: if (out_closed || in_closed)
166: return -1;
167: if (!queue.isEmpty())
168: break;
169: try {
170: // TODO have max-wait limit for stream timeout
171: queue.wait();
172: } catch (InterruptedException ie) {
173: throw new RuntimeException("interrupted");
174: }
175: }
176: Object o = queue.removeFirst();
177: if (o instanceof ByteArrayOutputStream) {
178: // our InputPipe is supposed to convert these, but we'll
179: // catch this case regardless
180: o = ((ByteArrayOutputStream) o).toByteArray();
181: }
182: if (o instanceof byte[]) {
183: buf = (byte[]) o;
184: offset = 0;
185: } else if (o == Tokens.CLOSE) {
186: out_closed = true;
187: return -1;
188: } else if (o == Tokens.FLUSH) {
189: return FLUSH;
190: } else if (o == Tokens.NOOP) {
191: return NOOP;
192: } else {
193: throw new RuntimeException("Unexpected type: "
194: + (o == null ? "null" : o.getClass()
195: .getName()));
196: }
197: }
198:
199: int n = Math.min(buf.length - offset, len);
200: System.arraycopy(buf, offset, b, off, n);
201: offset += n;
202: if (offset == buf.length) {
203: buf = null;
204: offset = 0;
205: }
206: return n;
207: }
208:
209: public void close() {
210: synchronized (queue) {
211: if (in_closed)
212: return;
213: in_closed = true;
214: queue.clear();
215: buf = null;
216: offset = 0;
217: queue.notifyAll();
218: }
219: }
220: }
221: }
|