001: /*
002: * Jython Database Specification API 2.0
003: *
004: * $Id: DBSink.java 2414 2005-02-23 04:26:23Z bzimmer $
005: *
006: * Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>
007: *
008: */
009: package com.ziclix.python.sql.pipe.db;
010:
011: import com.ziclix.python.sql.PyConnection;
012: import com.ziclix.python.sql.pipe.Sink;
013: import com.ziclix.python.sql.zxJDBC;
014: import org.python.core.Py;
015: import org.python.core.PyDictionary;
016: import org.python.core.PyList;
017: import org.python.core.PyObject;
018: import org.python.core.PyString;
019:
020: import java.util.HashSet;
021: import java.util.Set;
022:
023: /**
024: * A database consumer. All data transferred will be inserted into the appropriate table.
025: *
026: * @author brian zimmer
027: * @version $Revision: 2414 $
028: */
029: public class DBSink extends BaseDB implements Sink {
030:
031: /**
032: * Field sql
033: */
034: protected PyObject sql;
035:
036: /**
037: * Field exclude
038: */
039: protected Set exclude;
040:
041: /**
042: * Field rows
043: */
044: protected PyList rows;
045:
046: /**
047: * Field batchsize
048: */
049: protected int batchsize;
050:
051: /**
052: * Field bindings
053: */
054: protected PyObject bindings;
055:
056: /**
057: * Field indexedBindings
058: */
059: protected PyDictionary indexedBindings;
060:
061: /**
062: * Constructor for handling the consumption of data.
063: *
064: * @param connection the database connection
065: * @param dataHandler a custom DataHandler for the cursor, can be None
066: * @param tableName the table to insert the data
067: * @param exclude the columns to be excluded from insertion on the destination, all if None
068: * @param bindings the optional bindings for the destination, this allows morphing of types during the copy
069: * @param batchsize the optional batchsize for the inserts
070: */
071: public DBSink(PyConnection connection, Class dataHandler,
072: String tableName, PyObject exclude, PyObject bindings,
073: int batchsize) {
074:
075: super (connection, dataHandler, tableName);
076:
077: this .sql = Py.None;
078: this .rows = new PyList();
079: this .bindings = bindings;
080: this .batchsize = batchsize;
081: this .exclude = new HashSet();
082: this .indexedBindings = new PyDictionary();
083:
084: if (exclude != Py.None) {
085: for (int i = 0; i < exclude.__len__(); i++) {
086: PyObject lowered = Py.newString(((PyString) exclude
087: .__getitem__(i)).lower());
088:
089: this .exclude.add(lowered);
090: }
091: }
092: }
093:
094: /**
095: * Return true if the key (converted to lowercase) is not found in the exclude list.
096: */
097: protected boolean excluded(PyObject key) {
098:
099: PyObject lowered = Py.newString(((PyString) key).lower());
100:
101: return this .exclude.contains(lowered);
102: }
103:
104: /**
105: * Create the insert statement given the header row.
106: */
107: protected void createSql(PyObject row) {
108:
109: // this should be the column info
110: if ((row == Py.None) || (row.__len__() == 0)) {
111:
112: // if there are no columns, what's the point?
113: throw zxJDBC.makeException(zxJDBC.getString("noColInfo"));
114: }
115:
116: int index = 0, len = row.__len__();
117: PyObject entry = Py.None, col = Py.None, pyIndex = Py.None;
118: StringBuffer sb = new StringBuffer("insert into ").append(
119: this .tableName).append(" (");
120:
121: /*
122: * Iterate through the columns and pull out the names for use in the insert
123: * statement and the types for use in the bindings. The tuple is of the form
124: * (column name, column type).
125: */
126: for (int i = 0; i < len - 1; i++) {
127: entry = row.__getitem__(i);
128: col = entry.__getitem__(0);
129:
130: if (!this .excluded(col)) {
131:
132: // add to the list
133: sb.append(col).append(",");
134:
135: // add the binding
136: pyIndex = Py.newInteger(index++);
137:
138: try {
139: this .indexedBindings.__setitem__(pyIndex,
140: this .bindings.__getitem__(col));
141: } catch (Exception e) {
142:
143: // either a KeyError or this.bindings is None or null
144: this .indexedBindings.__setitem__(pyIndex, entry
145: .__getitem__(1));
146: }
147: }
148: }
149:
150: entry = row.__getitem__(len - 1);
151: col = entry.__getitem__(0);
152:
153: if (!this .excluded(col)) {
154: sb.append(col);
155:
156: pyIndex = Py.newInteger(index++);
157:
158: try {
159: this .indexedBindings.__setitem__(pyIndex, this .bindings
160: .__getitem__(col));
161: } catch (Exception e) {
162:
163: // either a KeyError or this.bindings is None or null
164: this .indexedBindings.__setitem__(pyIndex, entry
165: .__getitem__(1));
166: }
167: }
168:
169: sb.append(") values (");
170:
171: for (int i = 1; i < len; i++) {
172: sb.append("?,");
173: }
174:
175: sb.append("?)");
176:
177: if (index == 0) {
178: throw zxJDBC.makeException(zxJDBC.ProgrammingError, zxJDBC
179: .getString("excludedAllCols"));
180: }
181:
182: this .sql = Py.newString(sb.toString());
183: }
184:
185: /**
186: * Handle the row. Insert the data into the correct table and columns. No updates are done.
187: */
188: public void row(PyObject row) {
189:
190: if (this .sql != Py.None) {
191: if (this .batchsize <= 0) {
192:
193: // no batching, just go ahead each time
194: this .cursor.execute(this .sql, row,
195: this .indexedBindings, Py.None);
196: this .connection.commit();
197: } else {
198: this .rows.append(row);
199:
200: int len = rows.__len__();
201:
202: if (len % this .batchsize == 0) {
203: this .cursor.execute(this .sql, this .rows,
204: this .indexedBindings, Py.None);
205: this .connection.commit();
206:
207: this .rows = new PyList();
208: }
209: }
210: } else {
211: this .createSql(row);
212: }
213: }
214:
215: /**
216: * Method start
217: */
218: public void start() {
219: }
220:
221: /**
222: * Handles flushing any buffers and closes the cursor.
223: */
224: public void end() {
225:
226: // finish what we started
227: try {
228: int len = this .rows.__len__();
229:
230: if (len > 0) {
231: this.cursor.execute(this.sql, this.rows,
232: this.indexedBindings, Py.None);
233: this.connection.commit();
234: }
235: } finally {
236: this.cursor.close();
237: }
238: }
239: }
|