001: /*
002: * $Id: XMLEventPipe.java,v 1.2 2004/07/05 23:46:51 cniles Exp $
003: *
004: * Copyright (c) 2004, Christian Niles, Unit12
005: * All rights reserved.
006: *
007: * Redistribution and use in source and binary forms, with or without
008: * modification, are permitted provided that the following conditions are met:
009: *
010: * * Redistributions of source code must retain the above copyright
011: * notice, this list of conditions and the following disclaimer.
012: *
013: * * Redistributions in binary form must reproduce the above copyright
014: * notice, this list of conditions and the following disclaimer in the
015: * documentation and/or other materials provided with the distribution.
016: *
017: * * Neither the name of Christian Niles, Unit12, nor the names of its
018: * contributors may be used to endorse or promote products derived from
019: * this software without specific prior written permission.
020: *
021: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
022: * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
023: * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
024: * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
025: * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
026: * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
027: * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
028: * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
029: * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
030: * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
031: * POSSIBILITY OF SUCH DAMAGE.
032: *
033: */
034: package javanet.staxutils;
035:
036: import java.util.LinkedList;
037: import java.util.List;
038: import java.util.NoSuchElementException;
039:
040: import javax.xml.stream.XMLEventReader;
041: import javax.xml.stream.XMLEventWriter;
042: import javax.xml.stream.XMLStreamException;
043: import javax.xml.stream.events.XMLEvent;
044:
045: /**
046: * Provides the ability to pipe the {@link XMLEvent}s written to one
047: * {@link XMLEventWriter} to be read from a {@link XMLEventReader}. The
048: * implementation is based on a bounded-buffer with a specifiable maximum
049: * capacity. When that capacity is reached, the write end of the pipe will
050: * block until events are read from the read end. Similarly, when an attempt is
051: * made to read from an empty queue, the operation will block until more events
052: * are written to the buffer. The write end of the pipe will repair namespaces
053: * and buffer attribute/namespace events as defined in the specification of
054: * the {@link XMLEventWriter} interface.
055: * <br /><br />
056: * Both the read and write ends of this pipe are fully synchronized to allow
057: * multiple threads to read or write events to the pipe. However, care must be
058: * taken that the order of events is consistent, and that the stream is properly
059: * closed when writing is complete. <b>If the write end is never closed the read
060: * end may block indefinitely, waiting for further events.</b> To help prevent
061: * this, the write end will automatically close when an END_DOCUMENT event is
062: * written.
063: * <br /><br />
064: * To properly obey the expected behaviour of {@link XMLEventReader} and
065: * {@link javax.xml.stream.XMLStreamWriter}, methods such as
066: * {@link XMLEventReader#peek()} and {@link XMLEventReader#hasNext()} may block.
067: * This is necessary to prevent {@link XMLEventReader#hasNext()} from returning
068: * <code>true</code> just before the write end is closed, or <code>false</code>
069: * just before additional events are added. If the read end is closed before the
070: * writer, then the write end will silently discard all elements written to it
071: * until it is closed.
072: *
073: * @author Christian Niles
074: * @version $Revision: 1.2 $
075: */
076: public class XMLEventPipe {
077:
078: /**
079: * Default maximum number of events that may be stored by this pipe until
080: * the write end blocks.
081: */
082: public static final int QUEUE_CAPACITY = 16;
083:
084: /** List of events ready to be read. */
085: private List eventQueue = new LinkedList();
086:
087: /** The maximum capacity of the queue, after which the pipe should block. */
088: private int capacity = QUEUE_CAPACITY;
089:
090: /** Whether the read end has been closed. */
091: private boolean readEndClosed;
092:
093: /** Whether the write end has been closed. */
094: private boolean writeEndClosed;
095:
096: /**
097: * The read end of the pipe. This will be <code>null</code> until
098: * {@link #getReadEnd()} is called for the first time.
099: */
100: private PipedXMLEventReader readEnd = new PipedXMLEventReader(this );
101:
102: /**
103: * The write end of the pipe. This will be <code>null</code> until
104: * {@link #getWriteEnd()} is called for the first time.
105: */
106: private PipedXMLEventWriter writeEnd = new PipedXMLEventWriter(this );
107:
108: /**
109: * Constructs a new XMLEventPipe with the default capacity.
110: */
111: public XMLEventPipe() {
112: }
113:
114: /**
115: * Constructs a new XMLEventPipe with the specified capacity.
116: *
117: * @param capacity The number of events to buffer until the pipe will block.
118: * A number less than or equal to 0 means the pipe will buffer an
119: * unbounded number of events.
120: */
121: public XMLEventPipe(int capacity) {
122:
123: this .capacity = capacity;
124:
125: }
126:
127: /**
128: * Returns the read end of the pipe, from which events written to the write
129: * end of the pipe will be available.
130: *
131: * @return The read end of the pipe.
132: */
133: public synchronized XMLEventReader getReadEnd() {
134:
135: if (readEnd == null) {
136:
137: readEnd = new PipedXMLEventReader(this );
138:
139: }
140:
141: return readEnd;
142:
143: }
144:
145: /**
146: * Returns the write end of the pipe, whose events will be available from
147: * the read end of this pipe.
148: *
149: * @return The write end of the pipe.
150: */
151: public synchronized XMLEventWriter getWriteEnd() {
152:
153: if (writeEnd == null) {
154:
155: writeEnd = new PipedXMLEventWriter(this );
156:
157: }
158:
159: return writeEnd;
160:
161: }
162:
163: /**
164: * {@link XMLEventWriter} implementation used to provide the write end of
165: * the pipe.
166: *
167: * @author christian
168: * @version $Revision: 1.2 $
169: */
170: private static final class PipedXMLEventWriter extends
171: BaseXMLEventWriter {
172:
173: /** The pipe we're connected to. */
174: private XMLEventPipe pipe;
175:
176: public PipedXMLEventWriter(XMLEventPipe pipe) {
177:
178: this .pipe = pipe;
179:
180: }
181:
182: public synchronized void close() throws XMLStreamException {
183:
184: super .close();
185:
186: synchronized (pipe) {
187:
188: if (pipe.readEndClosed) {
189:
190: pipe.eventQueue.clear();
191:
192: }
193:
194: pipe.writeEndClosed = true;
195: pipe.notifyAll();
196:
197: }
198:
199: }
200:
201: protected void sendEvent(XMLEvent event)
202: throws XMLStreamException {
203:
204: synchronized (pipe) {
205:
206: if (pipe.readEndClosed) {
207:
208: // if read end is closed, throw away event
209: return;
210:
211: }
212:
213: if (pipe.capacity > 0) {
214:
215: while (pipe.eventQueue.size() >= pipe.capacity) {
216:
217: try {
218:
219: pipe.wait();
220:
221: } catch (InterruptedException e) {
222:
223: e.printStackTrace();
224:
225: }
226:
227: }
228:
229: }
230:
231: pipe.eventQueue.add(event);
232: if (pipe.eventQueue.size() == 1) {
233:
234: pipe.notifyAll();
235:
236: }
237:
238: if (event.isEndDocument()) {
239:
240: close();
241:
242: }
243:
244: }
245:
246: }
247:
248: }
249:
250: /**
251: * {@link XMLEventReader} implementation used to provide the read end of
252: * the pipe.
253: *
254: * @author christian
255: * @version $Revision: 1.2 $
256: */
257: private static final class PipedXMLEventReader extends
258: BaseXMLEventReader {
259:
260: /** THe pipe this stream is connected to. */
261: private XMLEventPipe pipe;
262:
263: public PipedXMLEventReader(XMLEventPipe pipe) {
264:
265: this .pipe = pipe;
266:
267: }
268:
269: public synchronized XMLEvent nextEvent()
270: throws XMLStreamException {
271:
272: if (closed) {
273:
274: throw new XMLStreamException("Stream has been closed");
275:
276: }
277:
278: synchronized (pipe) {
279:
280: while (pipe.eventQueue.size() == 0) {
281:
282: if (pipe.writeEndClosed) {
283:
284: throw new NoSuchElementException(
285: "Stream has completed");
286:
287: }
288:
289: try {
290:
291: pipe.wait();
292:
293: } catch (InterruptedException e) {
294:
295: e.printStackTrace();
296:
297: }
298:
299: }
300:
301: boolean notify = pipe.capacity > 0
302: && pipe.eventQueue.size() >= pipe.capacity;
303:
304: // remove next event from the queue
305: XMLEvent nextEvent = (XMLEvent) pipe.eventQueue
306: .remove(0);
307: if (notify) {
308:
309: pipe.notifyAll();
310:
311: }
312: return nextEvent;
313:
314: }
315:
316: }
317:
318: public synchronized boolean hasNext() {
319:
320: if (closed) {
321:
322: return false;
323:
324: }
325:
326: synchronized (pipe) {
327:
328: while (pipe.eventQueue.size() == 0) {
329:
330: if (pipe.writeEndClosed) {
331:
332: break;
333:
334: }
335:
336: try {
337:
338: pipe.wait();
339:
340: } catch (InterruptedException e) {
341: }
342:
343: }
344:
345: return pipe.eventQueue.size() > 0;
346:
347: }
348:
349: }
350:
351: public synchronized XMLEvent peek() throws XMLStreamException {
352:
353: if (closed) {
354:
355: return null;
356:
357: }
358:
359: synchronized (pipe) {
360:
361: // wait until the queue has more events
362: while (pipe.eventQueue.size() == 0) {
363:
364: if (pipe.writeEndClosed) {
365:
366: return null;
367:
368: }
369:
370: try {
371:
372: pipe.wait();
373:
374: } catch (InterruptedException e) {
375: }
376:
377: }
378:
379: return (XMLEvent) pipe.eventQueue.get(0);
380:
381: }
382:
383: }
384:
385: public synchronized void close() throws XMLStreamException {
386:
387: if (closed) {
388:
389: return;
390:
391: }
392:
393: synchronized (pipe) {
394:
395: pipe.readEndClosed = true;
396: pipe.notifyAll();
397:
398: }
399:
400: super .close();
401:
402: }
403:
404: public void finalize() {
405:
406: if (!closed) {
407:
408: synchronized (pipe) {
409:
410: pipe.readEndClosed = true;
411: pipe.notifyAll();
412:
413: }
414:
415: }
416:
417: }
418:
419: }
420:
421: }
|