001: /*
002: (c) Copyright 2002, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Hewlett-Packard Development Company, LP
003: [See end of file]
004: $Id: BufferPipe.java,v 1.12 2008/01/02 12:07:57 andy_seaborne Exp $
005: */
006:
007: package com.hp.hpl.jena.graph.query;
008:
009: import EDU.oswego.cs.dl.util.concurrent.*;
010: import com.hp.hpl.jena.shared.*;
011:
012: import java.util.*;
013:
014: /**
015: This class is a pipe between query threads, implemented as a bounded buffer.
016: @author kers
017: */
018: public class BufferPipe implements Pipe {
019: private boolean open = true;
020: private BoundedBuffer buffer = new BoundedBuffer(5);
021: private Object pending = null;
022:
023: public static class Finished {
024: protected RuntimeException e;
025:
026: public Finished() {
027: }
028:
029: public Finished(Exception e) {
030: this .e = new QueryStageException(e);
031: }
032:
033: public RuntimeException getCause() {
034: return e;
035: }
036: }
037:
038: private static final Finished finished = new Finished();
039:
040: public BufferPipe() {
041: }
042:
043: /** put something into the pipe; take care of BoundedBuffer's checked exceptions */
044: private Object fetch() {
045: try {
046: return buffer.take();
047: } catch (Exception e) {
048: throw new BoundedBufferTakeException(e);
049: }
050: }
051:
052: /** get something from the pipe; take care of BoundedBuffer's checked exceptions */
053: private void putAny(Object d) {
054: try {
055: buffer.put(d);
056: return;
057: } catch (Exception e) {
058: throw new BoundedBufferPutException(e);
059: }
060: }
061:
062: public void put(Domain d) {
063: putAny(d);
064: }
065:
066: public void close() {
067: putAny(finished);
068: }
069:
070: public void close(Exception e) {
071: putAny(new Finished(e));
072: }
073:
074: public boolean hasNext() {
075: if (open) {
076: if (pending == null) {
077: pending = fetch();
078: if (pending instanceof Finished) {
079: Finished end = (Finished) pending;
080: RuntimeException cause = end.getCause();
081: if (cause == null)
082: open = false;
083: else {
084: PatternStageBase.log
085: .debug(
086: "BufferPipe has recieved and rethrown an exception",
087: cause);
088: throw cause;
089: }
090: }
091: return open;
092: } else
093: return true;
094: } else
095: return false;
096: }
097:
098: public Domain get() {
099: if (hasNext() == false)
100: throw new NoSuchElementException();
101: if (!(pending instanceof Domain))
102: throw new RuntimeException(pending.getClass().toString());
103: try {
104: return (Domain) pending;
105: } finally {
106: pending = null;
107: }
108: }
109:
110: /**
111: Exception to throw if a <code>take</code> throws an exception.
112: */
113: public static class BoundedBufferTakeException extends
114: JenaException {
115: BoundedBufferTakeException(Exception e) {
116: super (e);
117: }
118: }
119:
120: /**
121: Exception to throw if a <code>put</code> throws an exception.
122: */
123: public static class BoundedBufferPutException extends JenaException {
124: BoundedBufferPutException(Exception e) {
125: super (e);
126: }
127: }
128: }
129: /*
130: (c) Copyright 2002, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Hewlett-Packard Development Company, LP
131: All rights reserved.
132:
133: Redistribution and use in source and binary forms, with or without
134: modification, are permitted provided that the following conditions
135: are met:
136:
137: 1. Redistributions of source code must retain the above copyright
138: notice, this list of conditions and the following disclaimer.
139:
140: 2. Redistributions in binary form must reproduce the above copyright
141: notice, this list of conditions and the following disclaimer in the
142: documentation and/or other materials provided with the distribution.
143:
144: 3. The name of the author may not be used to endorse or promote products
145: derived from this software without specific prior written permission.
146:
147: THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
148: IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
149: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
150: IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
151: INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
152: NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
153: DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
154: THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
155: (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
156: THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
157: */
|