001: package org.apache.ojb.broker.util.batch;
002:
003: /* Copyright 2002-2005 The Apache Software Foundation
004: *
005: * Licensed under the Apache License, Version 2.0 (the "License");
006: * you may not use this file except in compliance with the License.
007: * You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: import java.lang.reflect.Proxy;
019: import java.sql.Connection;
020: import java.sql.PreparedStatement;
021: import java.sql.SQLException;
022: import java.sql.Statement;
023: import java.util.ArrayList;
024: import java.util.Collection;
025: import java.util.HashMap;
026: import java.util.HashSet;
027: import java.util.Iterator;
028: import java.util.List;
029:
030: import org.apache.ojb.broker.PersistenceBroker;
031: import org.apache.ojb.broker.metadata.ClassDescriptor;
032: import org.apache.ojb.broker.metadata.CollectionDescriptor;
033: import org.apache.ojb.broker.metadata.DescriptorRepository;
034: import org.apache.ojb.broker.metadata.JdbcConnectionDescriptor;
035: import org.apache.ojb.broker.metadata.ObjectReferenceDescriptor;
036: import org.apache.ojb.broker.util.WrappedConnection;
037:
038: /**
039: * The implementation of {@link java.sql.Connection} which
040: * automatically gathers INSERT, UPDATE and DELETE
041: * PreparedStatements into batches.
042: *
043: * @author Oleg Nitz (<a href="mailto:olegnitz@apache.org">olegnitz@apache.org</a>)
044: * @version $Id: BatchConnection.java,v 1.20.2.1 2005/12/21 22:27:48 tomdz Exp $
045: */
046: public class BatchConnection extends WrappedConnection {
047: private static final int MAX_COUNT = 100;
048:
049: /**
050: * Maps PBKey to another HashMap,
051: * which maps table name to List of related tables (N:1 or 1:1)
052: */
053: private static HashMap _pbkeyToFKInfo = new HashMap();
054:
055: private boolean _useBatchInserts = true;
056: private HashMap _statements = new HashMap();
057: private ArrayList _order = new ArrayList();
058: private HashMap _fkInfo;
059: private HashSet _deleted;
060: private HashSet _dontInsert;
061: private HashSet _touched = new HashSet();
062: private int count = 0;
063: private JdbcConnectionDescriptor m_jcd;
064:
065: public BatchConnection(Connection conn, PersistenceBroker broker) {
066: super (conn);
067: m_jcd = broker.serviceConnectionManager()
068: .getConnectionDescriptor();
069: _fkInfo = (HashMap) _pbkeyToFKInfo.get(broker.getPBKey());
070: if (_fkInfo != null) {
071: return;
072: }
073:
074: DescriptorRepository repos = broker.getDescriptorRepository();
075: _fkInfo = new HashMap();
076: for (Iterator it = repos.iterator(); it.hasNext();) {
077: ClassDescriptor desc = (ClassDescriptor) it.next();
078: List ordList = desc.getObjectReferenceDescriptors();
079: if (!ordList.isEmpty()) {
080: HashSet fkTables = getFKTablesFor(desc
081: .getFullTableName());
082: for (Iterator it2 = ordList.iterator(); it2.hasNext();) {
083: ObjectReferenceDescriptor ord = (ObjectReferenceDescriptor) it2
084: .next();
085: ClassDescriptor oneDesc = repos
086: .getDescriptorFor(ord.getItemClass());
087: fkTables.addAll(getFullTableNames(oneDesc, repos));
088: }
089: }
090:
091: List codList = desc.getCollectionDescriptors();
092: for (Iterator it2 = codList.iterator(); it2.hasNext();) {
093: CollectionDescriptor cod = (CollectionDescriptor) it2
094: .next();
095: ClassDescriptor manyDesc = repos.getDescriptorFor(cod
096: .getItemClass());
097: if (cod.isMtoNRelation()) {
098: HashSet fkTables = getFKTablesFor(cod
099: .getIndirectionTable());
100: fkTables.addAll(getFullTableNames(desc, repos));
101: fkTables.addAll(getFullTableNames(manyDesc, repos));
102: } else {
103: HashSet manyTableNames = getFullTableNames(
104: manyDesc, repos);
105: for (Iterator it3 = manyTableNames.iterator(); it3
106: .hasNext();) {
107: HashSet fkTables = getFKTablesFor((String) it3
108: .next());
109: fkTables.addAll(getFullTableNames(desc, repos));
110: }
111: }
112: }
113: }
114: _pbkeyToFKInfo.put(broker.getPBKey(), _fkInfo);
115: }
116:
117: private HashSet getFKTablesFor(String tableName) {
118: HashSet fkTables = (HashSet) _fkInfo.get(tableName);
119:
120: if (fkTables == null) {
121: fkTables = new HashSet();
122: _fkInfo.put(tableName, fkTables);
123: }
124: return fkTables;
125: }
126:
127: private HashSet getFullTableNames(ClassDescriptor desc,
128: DescriptorRepository repos) {
129: String tableName;
130: HashSet tableNamesSet = new HashSet();
131: Collection extents = desc.getExtentClasses();
132:
133: tableName = desc.getFullTableName();
134: if (tableName != null) {
135: tableNamesSet.add(tableName);
136: }
137: for (Iterator it = extents.iterator(); it.hasNext();) {
138: Class extClass = (Class) it.next();
139: ClassDescriptor extDesc = repos.getDescriptorFor(extClass);
140: tableName = extDesc.getFullTableName();
141: if (tableName != null) {
142: tableNamesSet.add(tableName);
143: }
144: }
145: return tableNamesSet;
146: }
147:
148: public void setUseBatchInserts(boolean useBatchInserts) {
149: _useBatchInserts = useBatchInserts;
150: }
151:
152: /**
153: * Remember the order of execution
154: */
155: void nextExecuted(String sql) throws SQLException {
156: count++;
157:
158: if (_order.contains(sql)) {
159: return;
160: }
161:
162: String sqlCmd = sql.substring(0, 7);
163: String rest = sql.substring(sqlCmd.equals("UPDATE ") ? 7 // "UPDATE "
164: : 12); // "INSERT INTO " or "DELETE FROM "
165: String tableName = rest.substring(0, rest.indexOf(' '));
166: HashSet fkTables = (HashSet) _fkInfo.get(tableName);
167:
168: // we should not change order of INSERT/DELETE/UPDATE
169: // statements for the same table
170: if (_touched.contains(tableName)) {
171: executeBatch();
172: }
173: if (sqlCmd.equals("INSERT ")) {
174: if (_dontInsert != null && _dontInsert.contains(tableName)) {
175: // one of the previous INSERTs contained a table
176: // that references this table.
177: // Let's execute that previous INSERT right now so that
178: // in the future INSERTs into this table will go first
179: // in the _order array.
180: executeBatch();
181: }
182: } else
183: //if (sqlCmd.equals("DELETE ") || sqlCmd.equals("UPDATE "))
184: {
185: // We process UPDATEs in the same way as DELETEs
186: // because setting FK to NULL in UPDATE is equivalent
187: // to DELETE from the referential integrity point of view.
188:
189: if (_deleted != null && fkTables != null) {
190: HashSet intersection = (HashSet) _deleted.clone();
191:
192: intersection.retainAll(fkTables);
193: if (!intersection.isEmpty()) {
194: // one of the previous DELETEs contained a table
195: // that is referenced from this table.
196: // Let's execute that previous DELETE right now so that
197: // in the future DELETEs into this table will go first
198: // in the _order array.
199: executeBatch();
200: }
201: }
202: }
203:
204: _order.add(sql);
205:
206: _touched.add(tableName);
207: if (sqlCmd.equals("INSERT ")) {
208: if (fkTables != null) {
209: if (_dontInsert == null) {
210: _dontInsert = new HashSet();
211: }
212: _dontInsert.addAll(fkTables);
213: }
214: } else if (sqlCmd.equals("DELETE ")) {
215: if (_deleted == null) {
216: _deleted = new HashSet();
217: }
218: _deleted.add(tableName);
219: }
220: }
221:
222: /**
223: * If UPDATE, INSERT or DELETE, return BatchPreparedStatement,
224: * otherwise return null.
225: */
226: private PreparedStatement prepareBatchStatement(String sql) {
227: String sqlCmd = sql.substring(0, 7);
228:
229: if (sqlCmd.equals("UPDATE ") || sqlCmd.equals("DELETE ")
230: || (_useBatchInserts && sqlCmd.equals("INSERT "))) {
231: PreparedStatement stmt = (PreparedStatement) _statements
232: .get(sql);
233: if (stmt == null) {
234: // [olegnitz] for JDK 1.2 we need to list both PreparedStatement and Statement
235: // interfaces, otherwise proxy.jar works incorrectly
236: stmt = (PreparedStatement) Proxy.newProxyInstance(
237: getClass().getClassLoader(), new Class[] {
238: PreparedStatement.class,
239: Statement.class,
240: BatchPreparedStatement.class },
241: new PreparedStatementInvocationHandler(this ,
242: sql, m_jcd));
243: _statements.put(sql, stmt);
244: }
245: return stmt;
246: } else {
247: return null;
248: }
249: }
250:
251: public PreparedStatement prepareStatement(String sql)
252: throws SQLException {
253: PreparedStatement stmt = null;
254: stmt = prepareBatchStatement(sql);
255:
256: if (stmt == null) {
257: stmt = getDelegate().prepareStatement(sql);
258: }
259: return stmt;
260: }
261:
262: public PreparedStatement prepareStatement(String sql,
263: int resultSetType, int resultSetConcurrency)
264: throws SQLException {
265: PreparedStatement stmt = null;
266: stmt = prepareBatchStatement(sql);
267:
268: if (stmt == null) {
269: stmt = getDelegate().prepareStatement(sql, resultSetType,
270: resultSetConcurrency);
271: }
272: return stmt;
273: }
274:
275: public void executeBatch() throws SQLException {
276: BatchPreparedStatement batchStmt;
277: Connection conn = getDelegate();
278:
279: try {
280: for (Iterator it = _order.iterator(); it.hasNext();) {
281: batchStmt = (BatchPreparedStatement) _statements.get(it
282: .next());
283: batchStmt.doExecute(conn);
284: }
285: } finally {
286: _order.clear();
287:
288: if (_dontInsert != null) {
289: _dontInsert.clear();
290: }
291:
292: if (_deleted != null) {
293: _deleted.clear();
294: }
295: _touched.clear();
296: count = 0;
297: }
298: }
299:
300: public void executeBatchIfNecessary() throws SQLException {
301: if (count >= MAX_COUNT) {
302: executeBatch();
303: }
304: }
305:
306: public void clearBatch() {
307: _order.clear();
308: _statements.clear();
309:
310: if (_dontInsert != null) {
311: _dontInsert.clear();
312: }
313:
314: if (_deleted != null) {
315: _deleted.clear();
316: }
317: }
318:
319: public void commit() throws SQLException {
320: executeBatch();
321: _statements.clear();
322: getDelegate().commit();
323: }
324:
325: public void rollback() throws SQLException {
326: clearBatch();
327: getDelegate().rollback();
328: }
329: }
|