001: /**
002: * Copyright (C) 2001-2004 France Telecom R&D
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package org.objectweb.speedo.runtime.concurrency;
018:
019: import org.objectweb.speedo.AbstractSpeedo;
020: import org.objectweb.speedo.SpeedoTestHelper;
021: import org.objectweb.speedo.api.ExceptionHelper;
022: import org.objectweb.speedo.pobjects.userid.BasicB;
023: import org.objectweb.speedo.pobjects.basic.BasicA;
024: import org.objectweb.util.monolog.api.BasicLevel;
025:
026: import javax.jdo.PersistenceManager;
027: import javax.jdo.JDOException;
028: import javax.jdo.JDOFatalException;
029: import javax.jdo.PersistenceManagerFactory;
030:
031: import junit.framework.Assert;
032:
033: import java.util.ArrayList;
034: import java.util.Arrays;
035: import java.util.Collection;
036: import java.util.HashSet;
037: import java.util.List;
038: import java.util.Map;
039: import java.util.Set;
040: import java.util.Vector;
041: import java.util.Iterator;
042:
043: /**
044: * Tests the high concurrency of the Speedo (more than 100 users)
045: *
046: * @author S.Chassande-Barrioz
047: */
048: public class TestManyUsers extends SpeedoTestHelper {
049:
050: public static final int W_ACTION = 1;
051: public static final int R_W_ACTION = 2;
052: public static final int MIXED_ACTION = 3;
053:
054: public static Vector rollbackExceptions = new Vector();
055: public static Vector errors = new Vector();
056:
057: public TestManyUsers(String s) {
058: super (s);
059: }
060:
061: protected String getLoggerName() {
062: return LOG_NAME + ".rt.concurrency.TestManyUsers";
063: }
064:
065: /**
066: * Tests the concurrency of the initialization of a class (mapping)
067: * (1000 users)
068: */
069: public void testLoading() {
070: final int nbThread = Integer
071: .getInteger(
072: "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testLoading.thread",
073: 1000).intValue();
074: logger.log(BasicLevel.INFO, "Run concurrent Loading, "
075: + nbThread + " threads");
076: Thread[] ts = new Thread[nbThread];
077: for (int i = 0; i < nbThread; i++) {
078: ts[i] = new Thread(new Runnable() {
079: public void run() {
080: try {
081: PersistenceManager pm = pmf
082: .getPersistenceManager();
083: Thread.sleep(1);
084: pm.getObjectIdClass(BasicB.class);
085: pm.close();
086: Assert.assertTrue(true);
087: } catch (Exception e) {
088: e.printStackTrace();
089: fail(e.getMessage());
090: }
091: }
092: });
093: }
094: for (int i = 0; i < nbThread; i++) {
095: ts[i].start();
096: }
097: try {
098: for (int i = 0; i < nbThread; i++) {
099: ts[i].join();
100: }
101: } catch (InterruptedException e) {
102: fail(e.getMessage());
103: }
104: }
105:
106: /**
107: * Tests the concurrency of several writer on a same persistent object.
108: * (500 users). Action: a.writeF2(a.readF2() + 1)
109: */
110: public void testConcurrentReadAndWrite() {
111: logger.log(BasicLevel.INFO, "");
112: int nbThread = Integer
113: .getInteger(
114: "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testConcurrentReadAndWrite.thread",
115: 200).intValue();
116: logger.log(BasicLevel.INFO,
117: "Run concurrent readAndWrite actions");
118: int[] actions = new int[10];
119: Arrays.fill(actions, R_W_ACTION);
120: testAccess(nbThread, actions);
121: }
122:
123: /**
124: * Tests the concurrency of several writer on a same persistent object.
125: * (500 users). Action = a.inc()
126: */
127: public void testConcurrentWrite() {
128: logger.log(BasicLevel.INFO, "");
129: int nbThread = Integer
130: .getInteger(
131: "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testConcurrentWrite.thread",
132: 200).intValue();
133: logger.log(BasicLevel.INFO, "Run concurrent write actions");
134: int[] actions = new int[10];
135: Arrays.fill(actions, W_ACTION);
136: testAccess(nbThread, actions);
137: }
138:
139: /**
140: * Tests the concurrency of several writer on a same persistent object.
141: * (500 users). Action = a.inc()
142: */
143: public void testConcurrentMixed() {
144: logger.log(BasicLevel.INFO, "");
145: int nbThread = Integer
146: .getInteger(
147: "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testConcurrentWrite.thread",
148: 200).intValue();
149: logger.log(BasicLevel.INFO, "Run concurrent mixed actions");
150: int[] actions = new int[10];
151: Arrays.fill(actions, MIXED_ACTION);
152: testAccess(nbThread, actions);
153: }
154:
155: /**
156: * Tests the concurrency of several writer on a same persistent object.
157: * (500 users)
158: */
159: public void testAccess(final int nbThread, final int[] actions) {
160: final int initialValue = 0;
161: rollbackExceptions.clear();
162: errors.clear();
163: Thread[] ts = new Thread[nbThread];
164: BasicA ba = new BasicA();
165: ba.writeF1("1");
166: ba.writeF2(initialValue);
167: PersistenceManager pm = pmf.getPersistenceManager();
168: pm.currentTransaction().begin();
169: pm.makePersistent(ba);
170: final BasicA a = ba;
171: Object id = pm.getObjectId(ba);
172: pm.currentTransaction().commit();
173: pm.close();
174: for (int i = 0; i < nbThread; i++) {
175: final int _i = i;
176: ts[i] = new Thread(new Runnable() {
177: public void run() {
178: int action = (_i % 2 == 0 ? W_ACTION : R_W_ACTION);
179: for (int j = 0; j < actions.length; j++) {
180: PersistenceManager pm = pmf
181: .getPersistenceManager();
182: boolean rollback = false;
183: try {
184: pm.currentTransaction().begin();
185: logger.log(BasicLevel.DEBUG, _i + "," + j
186: + " begin tx");
187: switch (actions[j]) {
188: case W_ACTION:
189: a.incF2();
190: break;
191: case R_W_ACTION:
192: a.writeF2(a.readF2() + 1);
193: break;
194: case MIXED_ACTION:
195: if (action == W_ACTION) {
196: a.incF2();
197: action = R_W_ACTION;
198: } else if (action == R_W_ACTION) {
199: a.writeF2(a.readF2() + 1);
200: action = W_ACTION;
201: }
202: break;
203: }
204: pm.currentTransaction().commit();
205: logger.log(BasicLevel.DEBUG, _i + "," + j
206: + " finished");
207: } catch (JDOFatalException e) {
208: rollback = true;
209: rollbackExceptions.add(e);
210: logger.log(BasicLevel.DEBUG, "Tx " + _i
211: + "," + j + " has been rolledback");
212: } catch (Exception e) {
213: Exception ie = ExceptionHelper.getNested(e);
214: errors.add(ie);
215: logger.log(BasicLevel.ERROR, _i + "," + j
216: + " has a problem", ie);
217: pm.currentTransaction().rollback();
218: } finally {
219: try {
220: pm.close();
221: } catch (JDOException e) {
222: logger
223: .log(
224: BasicLevel.ERROR,
225: "tx "
226: + _i
227: + ","
228: + j
229: + " has been "
230: + (rollback ? "rolledback"
231: : "committed")
232: + " and the close occurs an error",
233: ExceptionHelper
234: .getNested(e));
235: throw e;
236: }
237: }
238: }
239: }
240: });
241: }
242: long execTime = System.currentTimeMillis();
243: for (int i = 0; i < nbThread; i++) {
244: ts[i].start();
245: }
246: int val = 0;
247: ArrayList al = new ArrayList(nbThread);
248: try {
249: logger.log(BasicLevel.INFO, nbThread
250: + " threads launched doing " + actions.length
251: + " actions, waiting them ...");
252: for (int i = 0; i < nbThread; i++) {
253: ts[i].join(1000);
254: if (ts[i].isAlive()) {
255: al.add(new Integer(i));
256: logger.log(BasicLevel.DEBUG, i
257: + " is not finished after"
258: + " the delay, it could be blocked");
259: }
260: }
261: String dg = getDG(pmf);
262: if (dg != null) {
263: logger.log(BasicLevel.INFO, dg);
264: }
265: if (al.size() > 0) {
266: for (Iterator it = al.iterator(); it.hasNext();) {
267: int th = ((Integer) it.next()).intValue();
268: if (!ts[th].isAlive()) {
269: logger.log(BasicLevel.DEBUG, th
270: + " is late but ok.");
271: it.remove();
272: }
273: }
274: }
275: execTime = System.currentTimeMillis() - execTime;
276: if (al.size() > 0) {
277: logger.log(BasicLevel.INFO, "Kill alive threads");
278: for (Iterator it = al.iterator(); it.hasNext();) {
279: int th = ((Integer) it.next()).intValue();
280: if (!ts[th].isAlive()) {
281: it.remove();
282: } else {
283: try {
284: ts[th].interrupt();
285: } catch (Exception e1) {
286: e1.printStackTrace();
287: }
288: }
289: }
290: fail("Thread " + al + " blocked!");
291: }
292: } catch (InterruptedException e) {
293: //fail(e.getMessage());
294: } finally {
295: if (al.size() == 0) {
296: logger.log(BasicLevel.DEBUG, "Auto cleaning");
297: pm = pmf.getPersistenceManager();
298: ba = (BasicA) pm.getObjectById(id, false);
299: val = ba.readF2();
300: pm.currentTransaction().begin();
301: pm.deletePersistent(ba);
302: pm.currentTransaction().commit();
303: pm.close();
304: int nbCommittedTx = nbThread * actions.length
305: - errors.size() - rollbackExceptions.size();
306: logger
307: .log(
308: BasicLevel.INFO,
309: "Commited transaction rate: "
310: + ((nbCommittedTx * 100) / (nbThread * actions.length))
311: + "%"
312: + ", exec time: "
313: + execTime
314: + "ms"
315: + ", tx/s:"
316: + ((nbCommittedTx * 1000) / execTime));
317: }
318: }
319: if (errors.size() > 0) {
320: fail("There are " + errors.size() + "/" + nbThread
321: + " errors during the run");
322: }
323: if (al.size() == 0) {
324: Assert.assertEquals("Bad f2 value", initialValue + nbThread
325: * actions.length - rollbackExceptions.size(), val);
326: }
327: }
328:
329: private static String getDG(PersistenceManagerFactory pmf) {
330: Map m = null;
331: try {
332: m = ((AbstractSpeedo) pmf).getDependencyGraph()
333: .getVertexes();
334: } catch (Exception e) {
335: e.printStackTrace();
336: return null;
337: }
338: if (m.size() == 0) {
339: return null;
340: }
341: StringBuffer sb = new StringBuffer("dependency Graph: ");
342: List waiters = new ArrayList(m.keySet());
343: Set s = new HashSet(waiters.size() * 2);
344: s.addAll(m.keySet());
345: for (Iterator it = ((Collection) m.values()).iterator(); it
346: .hasNext();) {
347: s.addAll((Collection) it.next());
348: }
349: List all = new ArrayList(s);
350: for (int i = 0; i < all.size(); i++) {
351: Object t1 = all.get(i);
352: int t1Idx = all.indexOf(t1);
353: Collection dependencies = (Collection) m.get(t1);
354: if (dependencies != null) {
355: for (Iterator it = dependencies.iterator(); it
356: .hasNext();) {
357: sb.append("\nws");
358: sb.append(t1Idx);
359: sb.append(" = > ");
360: sb.append("ws");
361: sb.append(all.indexOf(it.next()));
362: }
363: }
364: }
365: return sb.toString();
366: }
367:
368: }
|