001: /**
002: * Copyright (C) 2001-2005 France Telecom R&D
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package org.objectweb.speedo.runtime.query;
018:
019: import org.objectweb.speedo.SpeedoTestHelper;
020: import org.objectweb.speedo.pobjects.basic.BasicA;
021: import org.objectweb.util.monolog.api.BasicLevel;
022: import org.objectweb.util.monolog.api.Logger;
023:
024: import java.util.ArrayList;
025: import java.util.Arrays;
026: import java.util.Collection;
027: import java.util.Iterator;
028: import java.util.List;
029:
030: import javax.jdo.JDOHelper;
031: import javax.jdo.PersistenceManager;
032: import javax.jdo.Query;
033:
034: /**
035: *
036: *
037: * @author S.Chassande-Barrioz
038: */
039: public class TestCreateDeleteDuringQueries extends SpeedoTestHelper {
040:
041: private static long queryExecTime = 0;
042: private static long nbqueryok = 0;
043: private static long nbcreatedone = 0;
044: private static long nbdeletedone = 0;
045: private static long errors = 0;
046: private static long nbprint = 500;
047: private static int nbRunnerThread = 0;
048:
049: public static final List pos = new ArrayList();
050: public final List removed = new ArrayList();
051: public final static int[] finishedThreads = new int[1];
052:
053: private synchronized static void queryError(Logger logger,
054: Throwable t) {
055: logger.log(BasicLevel.ERROR, "", t);
056: errors++;
057: }
058:
059: private synchronized static void queryOk(Logger logger, int threadId) {
060: nbqueryok++;
061: if ((nbqueryok % nbprint) == 0) {
062: printStat(logger, threadId);
063: }
064: }
065:
066: private static void printStat(Logger logger, int threadId) {
067: int size = 0;
068: synchronized (pos) {
069: size = pos.size();
070: }
071: logger.log(BasicLevel.INFO, "Thread " + threadId + ": "
072: + " / Que:" + nbqueryok + " / Cre:" + nbcreatedone
073: + " / Del:" + nbdeletedone + " / PO:" + size
074: + " / delta:" + (nbcreatedone - nbdeletedone - size)
075: + " / Err:" + errors + " / F:" + finishedThreads[0]
076: + "/" + nbRunnerThread);
077: }
078:
079: private synchronized static void creationDone(Logger logger,
080: int threadId) {
081: nbcreatedone++;
082: if ((nbcreatedone % nbprint) == 0) {
083: printStat(logger, threadId);
084: }
085: }
086:
087: private synchronized static void deletionDone(Logger logger,
088: int threadId) {
089: nbdeletedone++;
090: if ((nbdeletedone % nbprint) == 0) {
091: printStat(logger, threadId);
092: }
093: }
094:
095: public TestCreateDeleteDuringQueries() {
096: super ("TestCreateDeleteDuringQueries");
097: }
098:
099: public TestCreateDeleteDuringQueries(String n) {
100: super (n);
101: }
102:
103: protected String getLoggerName() {
104: return SpeedoTestHelper.LOG_NAME
105: + ".query.TestCreateDeleteDuringQueries";
106: }
107:
108: public void testCreateDeleteDuringQueries1() {
109: final int createNbThread = getIntProperty(getLoggerName()
110: + ".create.nbthread", 5);
111: final int createNbLoop = getIntProperty(getLoggerName()
112: + ".create.loop", 1000);
113: final int deleteNbThread = getIntProperty(getLoggerName()
114: + ".delete.nbthread", 5);
115: final int deleteNbLoop = getIntProperty(getLoggerName()
116: + ".delete.loop", 1000);
117: final int queryNbThread = getIntProperty(getLoggerName()
118: + ".query.nbthread", 3);
119: testCreateDeleteDuringQueries1(createNbThread, createNbLoop,
120: deleteNbThread, deleteNbLoop, queryNbThread);
121: }
122:
123: public void testCreateDeleteDuringQueries1(
124: final int createNbThread, final int createNbLoop,
125: final int deleteNbThread, final int deleteNbLoop,
126: final int queryNbThread) {
127: logger.log(BasicLevel.INFO, "TestCreateDeleteDuringQueries1: "
128: + "\n\t-create: " + createNbThread
129: + " thread(s) will do " + createNbLoop + " action(s)"
130: + "\n\t-delete: " + deleteNbThread
131: + " thread(s) will do " + deleteNbLoop + " action(s)"
132: + "\n\t-query: " + queryNbThread + " thread(s)");
133: int nbTotalThread = createNbThread + deleteNbThread
134: + queryNbThread;
135: nbRunnerThread = createNbThread + deleteNbThread;
136: Thread[] ts = new Thread[nbTotalThread];
137: finishedThreads[0] = 0;
138: int thcounter = 0;
139: for (int threadType = 0; threadType < 3; threadType++) {
140: switch (threadType) {
141: case 0:
142: for (int i = 0; i < createNbThread; i++) {
143: final int threadId = thcounter;
144: ts[thcounter] = new Thread(new Runnable() {
145: public void run() {
146: try {
147: for (int k = 0; k < createNbLoop; k++) {
148: create(threadId);
149: }
150: } finally {
151: logger.log(BasicLevel.DEBUG, "thread: "
152: + threadId
153: + " has finished to create");
154: synchronized (finishedThreads) {
155: finishedThreads[0]++;
156: }
157: }
158: }
159: });
160: thcounter++;
161: }
162: break;
163:
164: case 1:
165: for (int i = 0; i < deleteNbThread; i++) {
166: final int threadId = thcounter;
167: ts[thcounter] = new Thread(new Runnable() {
168: public void run() {
169: try {
170: for (int k = 0; k < createNbLoop; k++) {
171: delete(threadId);
172: }
173: } finally {
174: logger.log(BasicLevel.DEBUG, "thread: "
175: + threadId
176: + " has finished to delete");
177: synchronized (finishedThreads) {
178: finishedThreads[0]++;
179: }
180: }
181: }
182: });
183: thcounter++;
184: }
185: break;
186: case 2:
187: for (int i = 0; i < queryNbThread; i++) {
188: final int threadId = thcounter;
189: ts[thcounter] = new Thread(new Runnable() {
190: public void run() {
191: int fts = 0;
192: do {
193: synchronized (finishedThreads) {
194: fts = finishedThreads[0];
195: }
196: query(threadId);
197: } while (fts < (createNbThread + deleteNbThread));
198: }
199: });
200: thcounter++;
201: }
202: break;
203: }
204: }
205: //shuffle the array of thread
206: List l = Arrays.asList(ts);
207: //Collections.shuffle(l);
208: ts = (Thread[]) l.toArray(new Thread[ts.length]);
209:
210: //Start threads
211: long exectime = System.currentTimeMillis();
212: for (int i = 0; i < nbTotalThread; i++) {
213: ts[i].start();
214: }
215: try {
216: for (int i = 0; i < nbTotalThread; i++) {
217: ts[i].join();
218: }
219: } catch (InterruptedException e) {
220: fail(e.getMessage());
221: }
222: exectime = System.currentTimeMillis() - exectime;
223: long nbRunQuery = nbqueryok + errors;
224: logger.log(BasicLevel.INFO, "Query successed: "
225: + ((nbqueryok * 100) / nbRunQuery) + "% (" + nbqueryok
226: + "/" + nbRunQuery + ", " + (nbRunQuery - nbqueryok)
227: + " error)");
228: logger.log(BasicLevel.INFO, "Query average execution time: "
229: + (queryExecTime / nbqueryok) + "ms");
230: logger.log(BasicLevel.INFO, "Rate: "
231: + ((nbqueryok * 1000) / exectime) + " query/sec");
232: if (errors > 0) {
233: fail(errors + " errors occured!");
234: }
235: }
236:
237: boolean run = true;
238: Object o = new Object();
239:
240: private void stop() {
241: synchronized (o) {
242: run = false;
243: }
244: }
245:
246: private synchronized void start() {
247: synchronized (o) {
248: run = true;
249: o.notifyAll();
250: }
251: }
252:
253: private void again() {
254: synchronized (o) {
255: while (!run) {
256: try {
257: o.wait();
258: } catch (InterruptedException e) {
259: }
260: }
261: }
262: }
263:
264: public void create(final int threadId) {
265: again();
266: PersistenceManager pm = pmf.getPersistenceManager();
267: pm.currentTransaction().begin();
268: BasicA po = new BasicA();
269: pm.makePersistent(po);
270: po.setUndeclaredField(pm.getObjectId(po).toString());
271: pm.currentTransaction().commit();
272: pm.close();
273: try {
274: Thread.sleep(10);
275: } catch (InterruptedException e) {
276: }
277: creationDone(logger, threadId);
278: synchronized (pos) {
279: pos.add(po);
280: pos.notifyAll();
281: }
282: }
283:
284: public void delete(final int threadId) {
285: again();
286: BasicA po = null;
287: synchronized (pos) {
288: while (pos.size() == 0) {
289: try {
290: pos.wait();
291: } catch (InterruptedException e) {
292: }
293: }
294: po = (BasicA) pos.remove(0);
295: }
296: if (po != null) {
297: PersistenceManager pm = pmf.getPersistenceManager();
298: pm.currentTransaction().begin();
299: po.writeF1();
300: Object oid = pm.getObjectId(po);
301: pm.deletePersistent(po);
302: synchronized (removed) {
303: removed.add(oid);
304: }
305: pm.currentTransaction().commit();
306: pm.close();
307: deletionDone(logger, threadId);
308: }
309: }
310:
311: public void query(final int threadId) {
312: again();
313: PersistenceManager pm = pmf.getPersistenceManager();
314: pm.currentTransaction().begin();
315: Query q = null;
316: ArrayList oids = new ArrayList();
317: Object po = null;
318: try {
319: q = pm.newQuery(BasicA.class);
320: Collection res = (Collection) q.execute();
321: for (Iterator iter = res.iterator(); iter.hasNext();) {
322: po = iter.next();
323: assertNotNull("Result element is null", po);
324: Object oid = pm.getObjectId(po);
325: boolean alreadyInRemoved = false;
326: synchronized (removed) {
327: alreadyInRemoved = removed.contains(oid);
328: }
329: oids.add(oid);
330: assertTrue("Object already removed !!!!",
331: !alreadyInRemoved);
332: po = null;
333: }
334: q.closeAll();
335: q = null;
336: pm.currentTransaction().commit();
337: pm.close();
338: pm = null;
339: queryOk(logger, threadId);
340: } catch (Throwable t) {
341: //stop();
342: queryError(logger, t);
343: try {
344: JDOHelper.isPersistent(po);
345: } catch (Throwable _t) {
346: }
347: if (q != null) {
348: q.closeAll();
349: }
350: if (pm != null) {
351: if (pm.currentTransaction().isActive()) {
352: pm.currentTransaction().rollback();
353: }
354: pm.close();
355: pm = null;
356: }
357: }
358: oids.clear();
359: }
360: }
|