001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.web.micro.mts;
028:
029: import java.io.ByteArrayOutputStream;
030: import java.io.OutputStream;
031: import java.util.ArrayList;
032: import java.util.Collections;
033: import java.util.List;
034: import java.util.Map;
035:
036: import org.cougaar.core.service.LoggingService;
037: import org.cougaar.core.service.ThreadService;
038: import org.cougaar.core.thread.Schedulable;
039: import org.cougaar.lib.web.micro.base.AnnotatedOutputStream;
040:
041: /**
042: * An output stream pipe.
043: * <p>
044: * One thread writes to the "output_stream", and another thread (spawned by
045: * this instance) periodically flushes the stream to our message-deliverer.
046: */
047: public class OutputPipe {
048:
049: private final LoggingService log;
050: private final Deliverer sender;
051: private final Map metaData;
052: private final long nagle;
053:
054: private final OutputStreamImpl out;
055:
056: private final Schedulable thread;
057:
058: // a list of buffered ByteArrayOutputStreams and Tokens
059: private final List queue = new ArrayList();
060:
061: private boolean closed;
062:
063: private int counter = 0;
064:
065: /**
066: * @param sender our output sender
067: *
068: * @param metaData optional meta-data to be sent in the first "deliver" call.
069: *
070: * @param nagle use -1 to send on every write/flush/close, 0 to only send on
071: * the close, or a positive number to only send once that many milliseconds have
072: * passed or a close has occured.
073: */
074: public OutputPipe(String threadName, LoggingService log,
075: ThreadService threadService, Deliverer sender,
076: Map metaData, long nagle) {
077: this .log = log;
078: this .sender = sender;
079: this .metaData = metaData;
080: this .nagle = nagle;
081: this .out = new OutputStreamImpl();
082:
083: if (sender == null) {
084: throw new IllegalArgumentException("null sender");
085: }
086:
087: if (nagle == 0) {
088: // only call the sender when we're done, in the "close()" thread
089: thread = null;
090: } else {
091: Runnable r = new Runnable() {
092: public void run() {
093: checkQueue();
094: }
095: };
096: // we may block up to our nagle duration
097: thread = threadService.getThread(this , r, threadName,
098: ThreadService.WILL_BLOCK_LANE);
099: }
100: }
101:
102: private void checkQueue() {
103: if (log.isDebugEnabled()) {
104: log.debug("checkQueue");
105: }
106:
107: // take data off queue
108: List data = null;
109: synchronized (queue) {
110: if (nagle < 0) {
111: if (queue.isEmpty())
112: return;
113: // take whatever's there, even if it's only a single byte
114: } else {
115: // wait a while, until either the nagle period expires or the stream
116: // is closed (whichever comes first)
117: //
118: // TODO support min/max nagle and a periodic NOOP as a keep-alive
119: if (!closed) {
120: try {
121: if (log.isDebugEnabled()) {
122: log.debug("wait " + nagle);
123: }
124: queue.wait(nagle);
125: } catch (InterruptedException e) {
126: }
127: }
128: if (queue.isEmpty())
129: return;
130: }
131: data = new ArrayList(queue);
132: queue.clear();
133: }
134:
135: // convert baos's to byte[]s, to make them serializable
136: for (int i = 0; i < data.size(); i++) {
137: Object oi = data.get(i);
138: if (oi instanceof ByteArrayOutputStream) {
139: byte[] b = ((ByteArrayOutputStream) oi).toByteArray();
140: data.set(i, b);
141: }
142: }
143:
144: // send
145: sender.deliver(counter, (counter == 0 ? metaData : null),
146: Collections.unmodifiableList(data));
147: counter++;
148: }
149:
150: /**
151: * @return an output stream
152: */
153: public AnnotatedOutputStream getOutputStream() {
154: return out;
155: }
156:
157: public void close() {
158: out.close();
159: }
160:
161: private class OutputStreamImpl extends AnnotatedOutputStream {
162:
163: public void write(int b) {
164: synchronized (queue) {
165: getBuffer().write(b);
166: }
167: if (thread != null) {
168: thread.start();
169: }
170: }
171:
172: public void write(byte[] b) {
173: write(b, 0, b.length);
174: }
175:
176: public void write(byte[] b, int off, int len) {
177: if (b == null) {
178: throw new NullPointerException();
179: } else if ((off < 0) || (off > b.length) || (len < 0)
180: || ((off + len) > b.length) || ((off + len) < 0)) {
181: throw new IndexOutOfBoundsException();
182: } else if (len == 0) {
183: return;
184: }
185:
186: synchronized (queue) {
187: getBuffer().write(b, off, len);
188: }
189: if (thread != null) {
190: thread.start();
191: }
192: }
193:
194: public void print(String s) {
195: // this is optional. The base class writes each individual character,
196: // which is space-efficient but time-inefficient.
197: write(s.getBytes());
198: }
199:
200: private ByteArrayOutputStream getBuffer() {
201: assert Thread.holdsLock(queue);
202: if (closed) {
203: throw new IllegalStateException("closed");
204: }
205: Object o = (queue.isEmpty() ? null : queue
206: .get(queue.size() - 1));
207: ByteArrayOutputStream buf;
208: if (o instanceof ByteArrayOutputStream) {
209: buf = (ByteArrayOutputStream) o;
210: } else {
211: buf = new ByteArrayOutputStream();
212: queue.add(buf);
213: }
214: return buf;
215: }
216:
217: public void flush() {
218: synchronized (queue) {
219: if (closed)
220: return;
221: Object last = (queue.isEmpty() ? null : queue.get(queue
222: .size() - 1));
223: if (last == Tokens.FLUSH) {
224: // ignore second flush in a row
225: return;
226: }
227: queue.add(Tokens.FLUSH);
228: }
229: if (thread != null) {
230: thread.start();
231: }
232: }
233:
234: // since we're message-based, when we're done writing/flushing then
235: // we should close the stream.
236: //
237: // This optimizes the tunnel by avoiding a separate, trivial, single-item
238: // "close" message after the client has read our response.
239: public void done() {
240: close();
241: }
242:
243: public void close() {
244: List data = null;
245: synchronized (queue) {
246: if (closed)
247: return;
248: closed = true;
249: queue.add(Tokens.CLOSE);
250: if (thread == null) {
251: data = new ArrayList(queue);
252: queue.clear();
253: } else {
254: queue.notifyAll();
255: }
256: }
257: if (thread == null) {
258: sender.deliver(0, metaData, Collections
259: .unmodifiableList(data));
260: } else {
261: thread.start();
262: }
263: }
264: }
265: }
|