001: /*
002: * Copyright 2001-2007 Geert Bevin <gbevin[remove] at uwyn dot com>
003: * Distributed under the terms of either:
004: * - the common development and distribution license (CDDL), v1.0; or
005: * - the GNU Lesser General Public License, v2.1 or later
006: * $Id: TestDbConcurrency.java 3634 2007-01-08 21:42:24Z gbevin $
007: */
008: package com.uwyn.rife.database;
009:
010: import com.uwyn.rife.database.exceptions.DatabaseException;
011: import com.uwyn.rife.database.queries.CreateTable;
012: import com.uwyn.rife.database.queries.Delete;
013: import com.uwyn.rife.database.queries.DropTable;
014: import com.uwyn.rife.database.queries.Insert;
015: import com.uwyn.rife.database.queries.Select;
016: import com.uwyn.rife.tools.InnerClassException;
017: import java.sql.Connection;
018: import java.sql.ResultSet;
019: import java.sql.SQLException;
020: import java.sql.Types;
021: import java.util.ArrayList;
022: import junit.framework.TestCase;
023:
024: public class TestDbConcurrency extends TestCase {
025: public static boolean VERBOSE = false;
026: public static boolean DEBUG = false;
027:
028: private Datasource mDatasource = null;
029:
030: private static final Object sOutputLock = new Object();
031: private static final int sOutputLimit = 60;
032: private static int sOutputChars = 0;
033: private static int sConnectionOverload = 25;
034:
035: public TestDbConcurrency(Datasource datasource,
036: String datasourceName, String name) {
037: super (name);
038: mDatasource = datasource;
039: }
040:
041: private static void display(char display) {
042: if (VERBOSE) {
043: synchronized (sOutputLock) {
044: System.out.print(display);
045: sOutputChars++;
046: if (sOutputChars == sOutputLimit) {
047: sOutputChars = 0;
048: System.out.println();
049: }
050: }
051: }
052: }
053:
054: public static void displayCommit() {
055: display('v');
056: }
057:
058: public static void displayError() {
059: display('x');
060: }
061:
062: public void testConcurrency() {
063: Structure structure = new Structure(mDatasource);
064: try {
065: structure.install();
066: } catch (DatabaseException e) {
067: e.printStackTrace();
068: }
069:
070: if (VERBOSE) {
071: System.out.println();
072: }
073:
074: ArrayList<Concurrency> threads = new ArrayList<Concurrency>();
075: Integer mainlock = new Integer(0);
076: Concurrency concurrency = null;
077: for (int i = 1; i <= mDatasource.getPoolsize()
078: * sConnectionOverload; i++) {
079: concurrency = new Concurrency(mDatasource, mainlock);
080:
081: Thread thread = new Thread(concurrency, "example " + i);
082: thread.setDaemon(true);
083: threads.add(concurrency);
084: thread.start();
085: }
086:
087: boolean thread_alive = true;
088: boolean thread_advanced = false;
089: synchronized (mainlock) {
090: while (thread_alive) {
091: try {
092: mainlock.wait(10000);
093: } catch (InterruptedException e) {
094: Thread.yield();
095: }
096:
097: thread_alive = false;
098: thread_advanced = false;
099:
100: for (Concurrency thread : threads) {
101: if (thread.isAlive()) {
102: thread_alive = true;
103:
104: if (thread.hasAdvanced()) {
105: thread_advanced = true;
106: break;
107: }
108: }
109: }
110:
111: // if none of the threads has advanced
112: // throw an error and terminate all threads
113: if (thread_alive && !thread_advanced) {
114: for (Concurrency thread : threads) {
115: thread.terminate();
116: }
117:
118: thread_alive = false;
119: System.out.println();
120: System.out
121: .println("Concurrency deadlock for datasource with driver: '"
122: + mDatasource.getDriver() + "'.");
123: System.exit(1);
124: break;
125: }
126: }
127: }
128:
129: try {
130: structure.remove();
131: } catch (DatabaseException e) {
132: e.printStackTrace();
133: }
134:
135: mDatasource.cleanup();
136: }
137: }
138:
139: class Concurrency extends DbQueryManager implements Runnable {
140: private Object mMainlock = null;
141: private int mErrors = 0;
142: private int mCommits = 0;
143: private long mLastExecution = -1;
144: private boolean mAlive = false;
145:
146: public Concurrency(Datasource datasource, Object mainlock) {
147: super (datasource);
148:
149: mMainlock = mainlock;
150: }
151:
152: public void terminate() {
153: mAlive = false;
154: }
155:
156: public long getLastExecution() {
157: return mLastExecution;
158: }
159:
160: public boolean hasAdvanced() {
161: if (-1 == mLastExecution) {
162: return true;
163: }
164:
165: if (System.currentTimeMillis() < mLastExecution + (100 * 1000)) {
166: return true;
167: }
168:
169: return false;
170: }
171:
172: public boolean isAlive() {
173: if (mErrors >= 10 && mCommits >= 10) {
174: return false;
175: }
176:
177: return mAlive;
178: }
179:
180: public void run() {
181: mAlive = true;
182:
183: while (isAlive()) {
184: mLastExecution = System.currentTimeMillis();
185:
186: try {
187: doIt();
188: Thread.yield();
189: Thread.sleep((int) Math.random() * 200);
190: TestDbConcurrency.displayCommit();
191: mCommits++;
192: } catch (DatabaseException e) {
193: TestDbConcurrency.displayError();
194: mErrors++;
195: if (TestDbConcurrency.DEBUG) {
196: e.printStackTrace();
197: }
198: } catch (InterruptedException e) {
199: e.printStackTrace();
200: }
201: }
202:
203: mAlive = false;
204:
205: synchronized (mMainlock) {
206: mMainlock.notifyAll();
207: }
208: }
209:
210: public void doIt() throws DatabaseException {
211: if (TestDbConcurrency.DEBUG) {
212: System.out.println(Thread.currentThread().getName()
213: + " : begin");
214: }
215:
216: inTransaction(new DbTransactionUserWithoutResult() {
217:
218: public int getTransactionIsolation() {
219: if (getDatasource().getAliasedDriver().equals(
220: "org.apache.derby.jdbc.EmbeddedDriver")) {
221: return Connection.TRANSACTION_READ_UNCOMMITTED;
222: }
223: if (getDatasource().getAliasedDriver().equals(
224: "in.co.daffodil.db.jdbc.DaffodilDBDriver")) {
225: return Connection.TRANSACTION_SERIALIZABLE;
226: }
227:
228: return -1;
229: }
230:
231: public void useTransactionWithoutResult()
232: throws InnerClassException {
233: Insert insert = new Insert(getDatasource());
234: insert.into("example").fieldParameter("firstname")
235: .fieldParameter("lastname");
236: DbPreparedStatement insert_stmt = getConnection()
237: .getPreparedStatement(insert);
238: try {
239: insert_stmt.setString("firstname", "John");
240: if ((Math.random() * 100) <= 30) {
241: insert_stmt.setNull("lastname", Types.VARCHAR);
242: } else {
243: insert_stmt.setString("lastname", "Doe");
244: }
245: insert_stmt.executeUpdate();
246: insert_stmt.clearParameters();
247: insert_stmt.setString("firstname", "Jane");
248: insert_stmt.setString("lastname", "TheLane");
249: insert_stmt.executeUpdate();
250: } finally {
251: insert_stmt.close();
252: }
253:
254: Select select = new Select(getDatasource());
255: select.from("example").orderBy("firstname");
256: DbStatement select_stmt = executeQuery(select);
257: try {
258: Processor processor = new Processor();
259: while (fetch(select_stmt.getResultSet(), processor)
260: && processor.wasSuccessful()) {
261: processor.getFirstname();
262: processor.getLastname();
263: }
264: } finally {
265: select_stmt.close();
266: }
267: }
268: });
269:
270: if ((Math.random() * 100) <= 10) {
271: inTransaction(new DbTransactionUserWithoutResult() {
272:
273: public int getTransactionIsolation() {
274: if (getDatasource().getAliasedDriver().equals(
275: "org.apache.derby.jdbc.EmbeddedDriver")) {
276: return Connection.TRANSACTION_READ_UNCOMMITTED;
277: }
278: if (getDatasource().getAliasedDriver().equals(
279: "in.co.daffodil.db.jdbc.DaffodilDBDriver")) {
280: return Connection.TRANSACTION_SERIALIZABLE;
281: }
282:
283: return -1;
284: }
285:
286: public void useTransactionWithoutResult()
287: throws InnerClassException {
288: Delete delete = new Delete(getDatasource());
289: delete.from("example");
290: DbPreparedStatement delete_stmt = getConnection()
291: .getPreparedStatement(delete);
292: try {
293: delete_stmt.executeUpdate();
294: } finally {
295: delete_stmt.close();
296: }
297: if (TestDbConcurrency.DEBUG) {
298: System.out.println(Thread.currentThread()
299: .getName()
300: + " : deleted");
301: }
302: }
303: });
304: }
305:
306: if (TestDbConcurrency.DEBUG) {
307: System.out.println(Thread.currentThread().getName()
308: + " : comitted");
309: }
310: }
311: }
312:
313: class Processor extends DbRowProcessor {
314: private String mFirstname = null;
315: private String mLastname = null;
316:
317: public String getFirstname() {
318: return mFirstname;
319: }
320:
321: public String getLastname() {
322: return mLastname;
323: }
324:
325: public boolean processRow(ResultSet resultSet) throws SQLException {
326: mFirstname = resultSet.getString("firstname");
327: mLastname = resultSet.getString("lastname");
328:
329: return true;
330: }
331: }
332:
333: class Structure extends DbQueryManager {
334: public Structure(Datasource datasource) {
335: super (datasource);
336: }
337:
338: public void install() throws DatabaseException {
339: CreateTable create = new CreateTable(getDatasource());
340: create.table("example").column("firstname", String.class, 50,
341: CreateTable.NOTNULL).column("lastname", String.class,
342: 50, CreateTable.NOTNULL);
343: executeUpdate(create);
344: }
345:
346: public void remove() throws DatabaseException {
347: DropTable drop = new DropTable(getDatasource());
348: drop.table("example");
349: executeUpdate(drop);
350: }
351: }
|