001: /*
002: * Enhydra Java Application Server Project
003: *
004: * The contents of this file are subject to the Enhydra Public License
005: * Version 1.1 (the "License"); you may not use this file except in
006: * compliance with the License. You may obtain a copy of the License on
007: * the Enhydra web site ( http://www.enhydra.org/ ).
008: *
009: * Software distributed under the License is distributed on an "AS IS"
010: * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
011: * the License for the specific terms governing rights and limitations
012: * under the License.
013: *
014: * The Initial Developer of the Enhydra Application Server is Lutris
015: * Technologies, Inc. The Enhydra Application Server and portions created
016: * by Lutris Technologies, Inc. are Copyright Lutris Technologies, Inc.
017: * All Rights Reserved.
018: *
019: * Contributor(s):
020: *
021: * $Id: OutputStreamEventQueue.java,v 1.2 2006-06-15 13:47:01 sinisa Exp $
022: */
023:
024: package com.lutris.util;
025:
026: import java.io.IOException;
027: import java.io.OutputStream;
028: import java.util.Date;
029: import java.util.Vector;
030:
031: /**
032: A queue of "events". Each event is a write to the OutputStream.
033: This class implements both the reader and writer half of the queue. <P>
034:
035: Each "event" is returned as an OutputStreamEventQueueEntry object. <P>
036:
037: The "writer" interface is the set of OutputStream calls. This class
038: extends OutputStream, overriding all the OutputStream methods.
039: All data written to this object is stored up as "events" in an internal
040: queue. Each write is a separate event, so sometimes you will get one
041: event with data followed by one with just a newline. You can pass objects
042: of this class to anything that expects and OutputStream. Calls to the
043: OutputStream methods always return immediatly. <P>
044:
045: You may optionally specify a maximum number of bytes to store. After
046: each write to the queue, if the new total number of bytes is larger than
047: this limit, then the oldest entries are discarded until the total is
048: at or below the limit. This could result in throwing out all the elements
049: in the queue. Only the number of bytes of data written by the
050: <CODE>write()</CODE> methods is counted, not the additional date
051: stamping and object encapsulation. If you specify a size limit of
052: zero, then no limit is imposed. <P>
053:
054: The "reader" interface is the getEvent() method. Calls to this method
055: will block until there is an event to return. Readers may also call the
056: hasEventsPending() method to see if there are any events. This call does
057: not block. <P>
058:
059: One possible use for this class is with the PrintTransactionFilter class.
060: If you pass in an instance of this class as the OutputStream, you may
061: then fetch the logging data in a convienent way. <P>
062:
063: @see com.lutris.util.OutputStreamEventQueueEntry
064: @see java.io.OutputStream
065: @see com.lutris.servlet.filter.PrintTransactionFilter
066: @author Andy John
067: */
068: public class OutputStreamEventQueue extends OutputStream {
069:
070: // Is this "stream" open or closed? Don't allow writes to a closed stream.
071: private boolean open;
072:
073: // My list of events.
074: private Vector queue;
075:
076: // The current amount of data stored in the queue.
077: private int numBytes;
078:
079: // The maximum number of bytes to be stores. Zero indicates no limit.
080: private int maxBytes;
081:
082: /**
083: Create a new, empty, OutputStreamEventQueue with no limit on the
084: size of the data stored.
085: */
086: public OutputStreamEventQueue() {
087: this (0);
088: }
089:
090: /**
091: Create a new, empty, OutputStreamEventQueue with a limit on the
092: number of bytes it will store.
093: */
094: public OutputStreamEventQueue(int maxBytes) {
095: this .maxBytes = maxBytes;
096: this .numBytes = 0;
097: open = true;
098: queue = new Vector();
099: }
100:
101: //--------------------------------------------------------------------
102: //
103: // Writer functions
104: //
105: //--------------------------------------------------------------------
106:
107: /**
108: Add an event to the queue containing one byte.
109:
110: @param b The byte to save in the queue.
111: @exception IOException If close() has been called.
112: @see java.io.OutputSream
113: */
114: public void write(int b) throws IOException {
115: synchronized (queue) {
116: if (!open)
117: throw new IOException(
118: "Wrote to closed OutputStreamEventQueue.");
119: OutputStreamEventQueueEntry evt = new OutputStreamEventQueueEntry();
120: evt.when = new Date();
121: evt.data = new byte[1];
122: evt.data[0] = (byte) b;
123: queue.addElement(evt);
124: this .numBytes += 4;
125: limitQueueSize();
126: queue.notify(); // Wake up one waiting reader.
127: }
128: }
129:
130: /**
131: Add an event to the queue containing an array of bytes.
132:
133: @param b The array of bytes to save in the queue.
134: @exception IOException If close() has been called.
135: @see java.io.OutputSream
136: */
137: public void write(byte b[]) throws IOException {
138: synchronized (queue) {
139: if (!open)
140: throw new IOException(
141: "Wrote to closed OutputStreamEventQueue.");
142: OutputStreamEventQueueEntry evt = new OutputStreamEventQueueEntry(
143: new Date(), b);
144: queue.addElement(evt);
145: this .numBytes += b.length;
146: limitQueueSize();
147: queue.notify(); // Wake up one waiting reader.
148: }
149: }
150:
151: /**
152: Add an event to the queue containing part of an array of bytes.
153: Bytes b[off]..b[off+len-1] are used.
154:
155: @param b The array of bytes to save part of in the queue.
156: @exception IOException If close() has been called.
157: @see java.io.OutputSream
158: */
159: public void write(byte b[], int off, int len) throws IOException {
160: synchronized (queue) {
161: if (!open)
162: throw new IOException(
163: "Wrote to closed OutputStreamEventQueue.");
164: OutputStreamEventQueueEntry evt = new OutputStreamEventQueueEntry();
165: evt.when = new Date();
166: byte[] c = new byte[len];
167: for (int i = 0; i < len; i++)
168: c[i] = b[i + off];
169: evt.data = c;
170: queue.addElement(evt);
171: this .numBytes += b.length;
172: limitQueueSize();
173: queue.notify(); // Wake up one waiting reader.
174: }
175: }
176:
177: /**
178: Does nothing. There is no buffering; every write immediatly adds an
179: event to the queue. This method is provided because OutputStream
180: does.
181:
182: @exception IOException If close() has been called.
183: @see java.io.OutputSream
184: */
185: public void flush() throws IOException {
186: synchronized (queue) {
187: if (!open)
188: throw new IOException(
189: "Flushed a closed OutputStreamEventQueue.");
190: }
191: }
192:
193: /**
194: Closes the OutputStream. It is no longer available for writing.
195: Any further writes will throw an IOException. <P>
196:
197: Closing the stream wakes up all the threads blocked on a read.
198: If the queue is empty, they will all notice this and return null.
199: This is your signal that there are no events and there never will be.
200:
201: @exception IOException If close() has already been called.
202: @see java.io.OutputSream
203: */
204: public void close() throws IOException {
205: synchronized (queue) {
206: if (!open)
207: throw new IOException(
208: "Closed an already closed OutputStreamEventQueue.");
209: open = false;
210: queue.notifyAll(); // Wake up all waiting readers, so
211: // the all notice we are closed.
212: }
213: }
214:
215: /*
216: Only call this if you are already synchronized on queue.
217: Throw out the oldest events untill we get down below our
218: limit on the amount of data we can hold.
219: This might leave the queue empty if the limit is small and
220: the newest entry has alot of data!
221: */
222: private void limitQueueSize() {
223: if (maxBytes == 0)
224: return;
225: while (numBytes > maxBytes) {
226: if (queue.size() <= 0) {
227: numBytes = 0;
228: return;
229: }
230: OutputStreamEventQueueEntry evt = (OutputStreamEventQueueEntry) queue
231: .firstElement();
232: queue.removeElement(evt);
233: numBytes -= evt.data.length;
234: if (numBytes < 0)
235: numBytes = 0;
236: }
237: }
238:
239: //--------------------------------------------------------------------
240: //
241: // Reader functions
242: //
243: //--------------------------------------------------------------------
244:
245: /**
246: Get an event from the queue. This will block until there is an
247: event to return. The data about the event is stored in a
248: OutputStreamEventQueueEntry object. <P>
249:
250: If null is returned, that means there are no more events, and the
251: stream is closed, so no further events will be generated.
252:
253: @return A OutputStreamEventQueueEntry object describing the event,
254: or null if the queue is empty and no more events are possible.
255: @see com.lutris.util.OutputStreamEventQueueEntry
256: */
257: public OutputStreamEventQueueEntry getEvent() {
258: while (true) {
259: // Try to return an entry.
260: synchronized (queue) {
261: // Are there currently any waiting entries?
262: if (queue.size() > 0) {
263: // Pop off the queue.
264: OutputStreamEventQueueEntry evt = (OutputStreamEventQueueEntry) queue
265: .firstElement();
266: queue.removeElement(evt);
267: numBytes -= evt.data.length;
268: if (numBytes < 0)
269: numBytes = 0;
270: return evt;
271: } else {
272: // Is it possible that more entries could show up?
273: if (!open)
274: return null;
275: }
276: // Since we didn't return something, wait.
277: try {
278: queue.wait();
279: } catch (InterruptedException e) {
280: }
281: }
282: }
283: }
284:
285: /**
286: Is there an event waiting in the queue right now? This call will
287: not block, however it is possible that another thread will take the
288: event before you do, so your call to getEvent may still block.
289:
290: @return True is there is an event waiting in the queue.
291: */
292: public boolean hasEventsPending() {
293: synchronized (queue) {
294: return (queue.size() > 0);
295: }
296: }
297: }
|