Source Code Cross Referenced for Pipe.java in  » Scripting » jython » com » ziclix » python » sql » pipe » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Scripting » jython » com.ziclix.python.sql.pipe 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Jython Database Specification API 2.0
003:         *
004:         * $Id: Pipe.java 2414 2005-02-23 04:26:23Z bzimmer $
005:         *
006:         * Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>
007:         *
008:         */
009:        package com.ziclix.python.sql.pipe;
010:
011:        import org.python.core.*;
012:        import com.ziclix.python.sql.*;
013:        import com.ziclix.python.sql.util.*;
014:
015:        /**
016:         * Manager for a Sink and Source.  The Pipe creates a Queue through which the Source
017:         * can feed data to the Sink.  Both Sink and Source run in their own thread and can
018:         * are completely independent of the other.  When the Source pushes None onto the
019:         * Queue, the piping is stopped and the Sink finishes processing all the remaining
020:         * data.  This class is especially useful for loading/copying data from one database
021:         * or table to another.
022:         *
023:         * @author brian zimmer
024:         * @version $Revision: 2414 $
025:         */
026:        public class Pipe {
027:
028:            /**
029:             * Default empty constructor.
030:             */
031:            public Pipe() {
032:            }
033:
034:            /**
035:             * Start the processing of the Source->Sink.
036:             *
037:             * @param source the data generator
038:             * @param sink   the consumer of the data
039:             * @return the number of rows seen (this includes the header row)
040:             */
041:            public PyObject pipe(Source source, Sink sink) {
042:
043:                Queue queue = new Queue();
044:                SourceRunner sourceRunner = new SourceRunner(queue, source);
045:                SinkRunner sinkRunner = new SinkRunner(queue, sink);
046:
047:                sourceRunner.start();
048:                sinkRunner.start();
049:
050:                try {
051:                    sourceRunner.join();
052:                } catch (InterruptedException e) {
053:                    queue.close();
054:
055:                    throw zxJDBC.makeException(e);
056:                }
057:
058:                try {
059:                    sinkRunner.join();
060:                } catch (InterruptedException e) {
061:                    queue.close();
062:
063:                    throw zxJDBC.makeException(e);
064:                }
065:
066:                /*
067:                 * This is interesting territory.  I originally tried to store the the Throwable in the Thread instance
068:                 * and then re-throw it here, but whenever I tried, I would get an NPE in the construction of the
069:                 * PyTraceback required for the PyException.  I tried calling .fillInStackTrace() but that didn't work
070:                 * either.  So I'm left with getting the String representation and throwing that.  At least it gives
071:                 * the relevant error messages, but the stack is lost.  This might have something to do with a Java
072:                 * issue I don't completely understand, such as what happens for an Exception whose Thread is no longer
073:                 * running?  Anyways, if anyone knows what to do I would love to hear about it.
074:                 */
075:                if (sourceRunner.threwException()) {
076:                    throw zxJDBC.makeException(sourceRunner.getException()
077:                            .toString());
078:                }
079:
080:                if (sinkRunner.threwException()) {
081:                    throw zxJDBC.makeException(sinkRunner.getException()
082:                            .toString());
083:                }
084:
085:                // if the source count is -1, no rows were queried
086:                if (sinkRunner.getCount() == 0) {
087:                    return Py.newInteger(0);
088:                }
089:
090:                // Assert that both sides handled the same number of rows.  I know doing the check up front kinda defeats
091:                // the purpose of the assert, but there's no need to create the buffer if I don't need it and I still
092:                // want to throw the AssertionError if required
093:                if ((sourceRunner.getCount() - sinkRunner.getCount()) != 0) {
094:                    Integer[] counts = { new Integer(sourceRunner.getCount()),
095:                            new Integer(sinkRunner.getCount()) };
096:                    String msg = zxJDBC.getString("inconsistentRowCount",
097:                            counts);
098:
099:                    Py.assert_(Py.Zero, Py.newString(msg));
100:                }
101:
102:                return Py.newInteger(sinkRunner.getCount());
103:            }
104:        }
105:
106:        /**
107:         * Class PipeRunner
108:         *
109:         * @author
110:         * @author last modified by $Author: bzimmer $
111:         * @version $Revision: 2414 $
112:         * @date $today.date$
113:         * @date last modified on $Date: 2005-02-22 20:26:23 -0800 (Tue, 22 Feb 2005) $
114:         * @copyright 2001 brian zimmer
115:         */
116:        abstract class PipeRunner extends Thread {
117:
118:            /**
119:             * Field counter
120:             */
121:            protected int counter;
122:
123:            /**
124:             * Field queue
125:             */
126:            protected Queue queue;
127:
128:            /**
129:             * Field exception
130:             */
131:            protected Throwable exception;
132:
133:            /**
134:             * Constructor PipeRunner
135:             *
136:             * @param Queue queue
137:             */
138:            public PipeRunner(Queue queue) {
139:
140:                this .counter = 0;
141:                this .queue = queue;
142:                this .exception = null;
143:            }
144:
145:            /**
146:             * The total number of rows handled.
147:             */
148:            public int getCount() {
149:                return this .counter;
150:            }
151:
152:            /**
153:             * Method run
154:             */
155:            public void run() {
156:
157:                try {
158:                    this .pipe();
159:                } catch (QueueClosedException e) {
160:
161:                    /*
162:                     * thrown by a closed queue when any operation is performed.  we know
163:                     * at this point that nothing else can happen to the queue and that
164:                     * both producer and consumer will stop since one closed the queue
165:                     * by throwing an exception (below) and the other is here.
166:                     */
167:                    return;
168:                } catch (Throwable e) {
169:                    this .exception = e.fillInStackTrace();
170:
171:                    this .queue.close();
172:                }
173:            }
174:
175:            /**
176:             * Handle the source/destination specific copying.
177:             */
178:            abstract protected void pipe() throws InterruptedException;
179:
180:            /**
181:             * Return true if the thread terminated because of an uncaught exception.
182:             */
183:            public boolean threwException() {
184:                return this .exception != null;
185:            }
186:
187:            /**
188:             * Return the uncaught exception.
189:             */
190:            public Throwable getException() {
191:                return this .exception;
192:            }
193:        }
194:
195:        /**
196:         * Class SourceRunner
197:         *
198:         * @author
199:         * @author last modified by $Author: bzimmer $
200:         * @version $Revision: 2414 $
201:         * @date $today.date$
202:         * @date last modified on $Date: 2005-02-22 20:26:23 -0800 (Tue, 22 Feb 2005) $
203:         * @copyright 2001 brian zimmer
204:         */
205:        class SourceRunner extends PipeRunner {
206:
207:            /**
208:             * Field source
209:             */
210:            protected Source source;
211:
212:            /**
213:             * Constructor SourceRunner
214:             *
215:             * @param Queue  queue
216:             * @param Source source
217:             */
218:            public SourceRunner(Queue queue, Source source) {
219:
220:                super (queue);
221:
222:                this .source = source;
223:            }
224:
225:            /**
226:             * Method pipe
227:             *
228:             * @throws InterruptedException
229:             */
230:            protected void pipe() throws InterruptedException {
231:
232:                PyObject row = Py.None;
233:
234:                this .source.start();
235:
236:                try {
237:                    while ((row = this .source.next()) != Py.None) {
238:                        this .queue.enqueue(row);
239:
240:                        this .counter++;
241:                    }
242:                } finally {
243:                    try {
244:                        this .queue.enqueue(Py.None);
245:                    } finally {
246:                        this .source.end();
247:                    }
248:                }
249:            }
250:        }
251:
252:        /**
253:         * Class SinkRunner
254:         *
255:         * @author
256:         * @author last modified by $Author: bzimmer $
257:         * @version $Revision: 2414 $
258:         * @date $today.date$
259:         * @date last modified on $Date: 2005-02-22 20:26:23 -0800 (Tue, 22 Feb 2005) $
260:         * @copyright 2001 brian zimmer
261:         */
262:        class SinkRunner extends PipeRunner {
263:
264:            /**
265:             * Field sink
266:             */
267:            protected Sink sink;
268:
269:            /**
270:             * Constructor SinkRunner
271:             *
272:             * @param Queue queue
273:             * @param Sink  sink
274:             */
275:            public SinkRunner(Queue queue, Sink sink) {
276:
277:                super (queue);
278:
279:                this .sink = sink;
280:            }
281:
282:            /**
283:             * Method pipe
284:             *
285:             * @throws InterruptedException
286:             */
287:            protected void pipe() throws InterruptedException {
288:
289:                PyObject row = Py.None;
290:
291:                this .sink.start();
292:
293:                try {
294:                    while ((row = (PyObject) this.queue.dequeue()) != Py.None) {
295:                        this.sink.row(row);
296:
297:                        this.counter++;
298:                    }
299:                } finally {
300:                    this.sink.end();
301:                }
302:            }
303:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.