001: /*
002: * Jython Database Specification API 2.0
003: *
004: * $Id: Pipe.java 2414 2005-02-23 04:26:23Z bzimmer $
005: *
006: * Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>
007: *
008: */
009: package com.ziclix.python.sql.pipe;
010:
011: import org.python.core.*;
012: import com.ziclix.python.sql.*;
013: import com.ziclix.python.sql.util.*;
014:
015: /**
016: * Manager for a Sink and Source. The Pipe creates a Queue through which the Source
017: * can feed data to the Sink. Both Sink and Source run in their own thread and can
018: * are completely independent of the other. When the Source pushes None onto the
019: * Queue, the piping is stopped and the Sink finishes processing all the remaining
020: * data. This class is especially useful for loading/copying data from one database
021: * or table to another.
022: *
023: * @author brian zimmer
024: * @version $Revision: 2414 $
025: */
026: public class Pipe {
027:
028: /**
029: * Default empty constructor.
030: */
031: public Pipe() {
032: }
033:
034: /**
035: * Start the processing of the Source->Sink.
036: *
037: * @param source the data generator
038: * @param sink the consumer of the data
039: * @return the number of rows seen (this includes the header row)
040: */
041: public PyObject pipe(Source source, Sink sink) {
042:
043: Queue queue = new Queue();
044: SourceRunner sourceRunner = new SourceRunner(queue, source);
045: SinkRunner sinkRunner = new SinkRunner(queue, sink);
046:
047: sourceRunner.start();
048: sinkRunner.start();
049:
050: try {
051: sourceRunner.join();
052: } catch (InterruptedException e) {
053: queue.close();
054:
055: throw zxJDBC.makeException(e);
056: }
057:
058: try {
059: sinkRunner.join();
060: } catch (InterruptedException e) {
061: queue.close();
062:
063: throw zxJDBC.makeException(e);
064: }
065:
066: /*
067: * This is interesting territory. I originally tried to store the the Throwable in the Thread instance
068: * and then re-throw it here, but whenever I tried, I would get an NPE in the construction of the
069: * PyTraceback required for the PyException. I tried calling .fillInStackTrace() but that didn't work
070: * either. So I'm left with getting the String representation and throwing that. At least it gives
071: * the relevant error messages, but the stack is lost. This might have something to do with a Java
072: * issue I don't completely understand, such as what happens for an Exception whose Thread is no longer
073: * running? Anyways, if anyone knows what to do I would love to hear about it.
074: */
075: if (sourceRunner.threwException()) {
076: throw zxJDBC.makeException(sourceRunner.getException()
077: .toString());
078: }
079:
080: if (sinkRunner.threwException()) {
081: throw zxJDBC.makeException(sinkRunner.getException()
082: .toString());
083: }
084:
085: // if the source count is -1, no rows were queried
086: if (sinkRunner.getCount() == 0) {
087: return Py.newInteger(0);
088: }
089:
090: // Assert that both sides handled the same number of rows. I know doing the check up front kinda defeats
091: // the purpose of the assert, but there's no need to create the buffer if I don't need it and I still
092: // want to throw the AssertionError if required
093: if ((sourceRunner.getCount() - sinkRunner.getCount()) != 0) {
094: Integer[] counts = { new Integer(sourceRunner.getCount()),
095: new Integer(sinkRunner.getCount()) };
096: String msg = zxJDBC.getString("inconsistentRowCount",
097: counts);
098:
099: Py.assert_(Py.Zero, Py.newString(msg));
100: }
101:
102: return Py.newInteger(sinkRunner.getCount());
103: }
104: }
105:
106: /**
107: * Class PipeRunner
108: *
109: * @author
110: * @author last modified by $Author: bzimmer $
111: * @version $Revision: 2414 $
112: * @date $today.date$
113: * @date last modified on $Date: 2005-02-22 20:26:23 -0800 (Tue, 22 Feb 2005) $
114: * @copyright 2001 brian zimmer
115: */
116: abstract class PipeRunner extends Thread {
117:
118: /**
119: * Field counter
120: */
121: protected int counter;
122:
123: /**
124: * Field queue
125: */
126: protected Queue queue;
127:
128: /**
129: * Field exception
130: */
131: protected Throwable exception;
132:
133: /**
134: * Constructor PipeRunner
135: *
136: * @param Queue queue
137: */
138: public PipeRunner(Queue queue) {
139:
140: this .counter = 0;
141: this .queue = queue;
142: this .exception = null;
143: }
144:
145: /**
146: * The total number of rows handled.
147: */
148: public int getCount() {
149: return this .counter;
150: }
151:
152: /**
153: * Method run
154: */
155: public void run() {
156:
157: try {
158: this .pipe();
159: } catch (QueueClosedException e) {
160:
161: /*
162: * thrown by a closed queue when any operation is performed. we know
163: * at this point that nothing else can happen to the queue and that
164: * both producer and consumer will stop since one closed the queue
165: * by throwing an exception (below) and the other is here.
166: */
167: return;
168: } catch (Throwable e) {
169: this .exception = e.fillInStackTrace();
170:
171: this .queue.close();
172: }
173: }
174:
175: /**
176: * Handle the source/destination specific copying.
177: */
178: abstract protected void pipe() throws InterruptedException;
179:
180: /**
181: * Return true if the thread terminated because of an uncaught exception.
182: */
183: public boolean threwException() {
184: return this .exception != null;
185: }
186:
187: /**
188: * Return the uncaught exception.
189: */
190: public Throwable getException() {
191: return this .exception;
192: }
193: }
194:
195: /**
196: * Class SourceRunner
197: *
198: * @author
199: * @author last modified by $Author: bzimmer $
200: * @version $Revision: 2414 $
201: * @date $today.date$
202: * @date last modified on $Date: 2005-02-22 20:26:23 -0800 (Tue, 22 Feb 2005) $
203: * @copyright 2001 brian zimmer
204: */
205: class SourceRunner extends PipeRunner {
206:
207: /**
208: * Field source
209: */
210: protected Source source;
211:
212: /**
213: * Constructor SourceRunner
214: *
215: * @param Queue queue
216: * @param Source source
217: */
218: public SourceRunner(Queue queue, Source source) {
219:
220: super (queue);
221:
222: this .source = source;
223: }
224:
225: /**
226: * Method pipe
227: *
228: * @throws InterruptedException
229: */
230: protected void pipe() throws InterruptedException {
231:
232: PyObject row = Py.None;
233:
234: this .source.start();
235:
236: try {
237: while ((row = this .source.next()) != Py.None) {
238: this .queue.enqueue(row);
239:
240: this .counter++;
241: }
242: } finally {
243: try {
244: this .queue.enqueue(Py.None);
245: } finally {
246: this .source.end();
247: }
248: }
249: }
250: }
251:
252: /**
253: * Class SinkRunner
254: *
255: * @author
256: * @author last modified by $Author: bzimmer $
257: * @version $Revision: 2414 $
258: * @date $today.date$
259: * @date last modified on $Date: 2005-02-22 20:26:23 -0800 (Tue, 22 Feb 2005) $
260: * @copyright 2001 brian zimmer
261: */
262: class SinkRunner extends PipeRunner {
263:
264: /**
265: * Field sink
266: */
267: protected Sink sink;
268:
269: /**
270: * Constructor SinkRunner
271: *
272: * @param Queue queue
273: * @param Sink sink
274: */
275: public SinkRunner(Queue queue, Sink sink) {
276:
277: super (queue);
278:
279: this .sink = sink;
280: }
281:
282: /**
283: * Method pipe
284: *
285: * @throws InterruptedException
286: */
287: protected void pipe() throws InterruptedException {
288:
289: PyObject row = Py.None;
290:
291: this .sink.start();
292:
293: try {
294: while ((row = (PyObject) this.queue.dequeue()) != Py.None) {
295: this.sink.row(row);
296:
297: this.counter++;
298: }
299: } finally {
300: this.sink.end();
301: }
302: }
303: }
|