001: /**
002: * Copyright (C) 2001-2004 France Telecom R&D
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package org.objectweb.speedo.runtime.concurrency;
018:
019: import org.objectweb.perseus.concurrency.lib.Semaphore;
020: import org.objectweb.speedo.SpeedoTestHelper;
021: import org.objectweb.speedo.api.SpeedoProperties;
022: import org.objectweb.speedo.pobjects.basic.BasicA;
023: import org.objectweb.util.monolog.api.BasicLevel;
024:
025: import java.util.ArrayList;
026: import java.util.Collection;
027: import java.util.Iterator;
028: import java.util.Properties;
029:
030: import javax.jdo.PersistenceManager;
031: import javax.jdo.Query;
032:
033: import junit.framework.Assert;
034:
035: /**
036: *
037: * @author S.Chassande-Barrioz
038: */
039: public class TestSeveralPMUsers extends SpeedoTestHelper {
040:
041: public TestSeveralPMUsers(String s) {
042: super (s);
043: }
044:
045: protected String getLoggerName() {
046: return LOG_NAME + ".rt.concurrency.TestSeveralPMUsers";
047: }
048:
049: public Properties getPMFProperties() {
050: Properties p = super .getPMFProperties();
051: p.setProperty(SpeedoProperties.JDO_OPTION_MULTITREADED, "true");
052: p.setProperty(SpeedoProperties.MAPPING_STRUCTURE,
053: SpeedoProperties.MAPPING_STRUCTURE_DD);
054: return p;
055: }
056:
057: public void test2thread1PMNoTx() {
058: final PersistenceManager pm = pmf.getPersistenceManager();
059: BasicA ba = new BasicA();
060: ba.writeF1("test2thread1PMNoTx.thread1");
061: ba.writeF2(2);
062: pm.makePersistent(ba);
063: final Object oid = pm.getObjectId(ba);
064: Thread t = new Thread(new Runnable() {
065: public void run() {
066: try {
067: BasicA _ba = ((BasicA) pm.getObjectById(oid, false));
068: _ba.readF1_F2();
069: _ba.writeF1("test2thread1PMNoTx.thread2");
070: } catch (Exception e) {
071: e.printStackTrace();
072: fail(e.getMessage());
073: }
074: }
075: });
076: t.start();
077: try {
078: t.join();
079: } catch (InterruptedException e) {
080: }
081: Assert.assertTrue("The thread2 is not finished", !t.isAlive());
082: pm.currentTransaction().begin();
083: pm.deletePersistent(ba);
084: pm.currentTransaction().commit();
085: pm.close();
086:
087: }
088:
089: public void test2thread1PMTx() {
090: final PersistenceManager pm = pmf.getPersistenceManager();
091: BasicA ba = new BasicA();
092: ba.writeF1("test2thread1PM.thread1");
093: ba.writeF2(1);
094: pm.currentTransaction().begin();
095: pm.makePersistent(ba);
096: final Object oid = pm.getObjectId(ba);
097: Thread t = new Thread(new Runnable() {
098: public void run() {
099: try {
100: BasicA _ba = ((BasicA) pm.getObjectById(oid, false));
101: _ba.readF1_F2();
102: _ba.writeF1("test2thread1PM.thread2");
103: _ba.writeF2(2);
104: pm.currentTransaction().commit();
105: } catch (Exception e) {
106: e.printStackTrace();
107: fail(e.getMessage());
108: }
109: }
110: });
111: t.start();
112: try {
113: t.join();
114: } catch (InterruptedException e) {
115: }
116: Assert.assertTrue("The thread2 is not finished", !t.isAlive());
117: pm.currentTransaction().begin();
118: Assert.assertEquals("Bad f2 value", 2, ba.readF2());
119: ba.writeF2(3);
120: pm.currentTransaction().commit();
121: pm.currentTransaction().begin();
122: pm.deletePersistent(ba);
123: pm.currentTransaction().commit();
124: pm.close();
125: }
126:
127: public void testManyThreadConcurrentData20th70ob30m0q0i() {
128: testManyThreadConcurrentData(20, 70, 30, 5, 5);
129: }
130:
131: public void testManyThreadConcurrentData70th70ob0m5q5i() {
132: testManyThreadConcurrentData(70, 70, 0, 5, 5);
133: }
134:
135: public void testManyThreadConcurrentData100th100ob10m5q5i() {
136: testManyThreadConcurrentData(100, 100, 10, 5, 5);
137: }
138:
139: public void testManyThreadConcurrentData(final int NB_THREAD,
140: final int NB_OBJECT, final int NB_MODIF,
141: final int NB_QUERY, final int INTERVAL) {
142: logger.log(BasicLevel.INFO, "testManyThreadConcurrentData: "
143: + " \n\tNB_THREAD=" + NB_THREAD + " \n\tNB_OBJECT="
144: + NB_OBJECT + " \n\tNB_MODIF=" + NB_MODIF
145: + " \n\tNB_QUERY=" + NB_QUERY + " \n\tINTERVAL="
146: + INTERVAL);
147: assertTrue(
148: "Bad configuration, the number of object must be greater or equal than the number of thread",
149: NB_OBJECT >= NB_THREAD);
150: final PersistenceManager pm = pmf.getPersistenceManager();
151: pm.currentTransaction().begin();
152: final Object[] oids = new Object[NB_OBJECT];
153: final Semaphore[] s = new Semaphore[NB_OBJECT];
154: for (int i = 0; i < NB_OBJECT; i++) {
155: BasicA ba = new BasicA();
156: ba.writeF1("testManyThreadConcurrentData_" + i);
157: ba.writeF2(i);
158: pm.makePersistent(ba);
159: oids[i] = pm.getObjectId(ba);
160: s[i] = new Semaphore();
161: }
162: pm.currentTransaction().commit();
163:
164: //clean the cache
165: pmf.getDataStoreCache().evictAll();
166:
167: Thread[] threads = new Thread[NB_THREAD];
168: ThreadTestManyThreadConcurrentData[] runners = new ThreadTestManyThreadConcurrentData[NB_THREAD];
169: for (int i = 0; i < NB_THREAD; i++) {
170: final int threadId = i;
171: threads[i] = new Thread(runners[i]);
172: runners[i] = new ThreadTestManyThreadConcurrentData(
173: threadId, NB_OBJECT, NB_MODIF, NB_QUERY, INTERVAL,
174: oids, s, pm, this );
175: }
176: pm.currentTransaction().begin();
177: for (int i = 0; i < NB_THREAD; i++) {
178: threads[i].start();
179: }
180: for (int i = 0; i < NB_THREAD; i++) {
181: try {
182: threads[i].join();
183: } catch (InterruptedException e) {
184: }
185: }
186: int error = 0;
187: for (int i = 0; i < NB_THREAD; i++) {
188: if (runners[i].throwable != null) {
189: logger.log(BasicLevel.ERROR, "Error in the thread " + i
190: + ": ", runners[i].throwable);
191: error++;
192: }
193: }
194: pm.currentTransaction().commit();
195: pm.currentTransaction().begin();
196: pm.deletePersistentAll(pm.getObjectsById(oids));
197: pm.currentTransaction().commit();
198: pm.close();
199: if (error > 0) {
200: fail(error + " error(s) occur.");
201: }
202: }
203: }
204:
205: class ThreadTestManyThreadConcurrentData implements Runnable {
206:
207: private final int threadId;
208: private final int NB_OBJECT;
209: private final int NB_MODIF;
210: private final int NB_QUERY;
211: private final int INTERVAL;
212: private final Object[] oids;
213: private final PersistenceManager pm;
214: private final Semaphore[] s;
215: private final SpeedoTestHelper st;
216: public Throwable throwable = null;
217:
218: public ThreadTestManyThreadConcurrentData(final int threadId,
219: final int NB_OBJECT, final int NB_MODIF,
220: final int NB_QUERY, final int INTERVAL,
221: final Object[] oids, final Semaphore[] s,
222: final PersistenceManager pm, final SpeedoTestHelper st) {
223: this .threadId = threadId;
224: this .NB_OBJECT = NB_OBJECT;
225: this .NB_MODIF = NB_MODIF;
226: this .NB_QUERY = NB_QUERY;
227: this .INTERVAL = INTERVAL;
228: this .oids = oids;
229: this .s = s;
230: this .pm = pm;
231: this .st = st;
232: }
233:
234: public void run() {
235: try {
236: execute();
237: } catch (Throwable t) {
238: throwable = t;
239: }
240: }
241:
242: private void execute() {
243: //test concurrent loading in the same transaction
244: BasicA ba = ((BasicA) pm.getObjectById(oids[NB_OBJECT - 1],
245: false));
246: ba.readF1_F2();
247:
248: ba = ((BasicA) pm.getObjectById(oids[threadId], false));
249: ba.writeF1(ba.readF1() + "modifiedBy" + threadId);
250:
251: for (int j = 0; j < NB_MODIF; j++) {
252: // Choose an object: alternativly on each side
253: // of the threadId
254: int id = j;
255: if (j % 2 == 0) {
256: id = Math.min(NB_OBJECT - 1, threadId + j);
257: } else {
258: id = Math.max(0, threadId - j);
259: }
260: ba = ((BasicA) pm.getObjectById(oids[id], false));
261: s[id].P();
262: try {
263: ba.writeF1(ba.readF1() + "modifiedBy" + threadId);
264: } finally {
265: s[id].V();
266: }
267: }
268:
269: for (int j = 0; j < NB_QUERY; j++) {
270: final int min = Math.max(0, threadId - (INTERVAL / 2));
271: final int max = Math.min(NB_OBJECT - 1, threadId
272: + (INTERVAL / 2));
273: Query q = pm.newQuery(BasicA.class);
274: q.declareParameters("int pmin, int pmax");
275: q.setFilter("(f2 >= pmin) && (f2 <= pmax)");
276: Collection c = (Collection) q.execute(new Integer(min),
277: new Integer(max));
278: Collection expected = new ArrayList();
279: for (int id = min; id <= max; id++) {
280: expected.add(new Integer(id));
281: }
282: st.getLogger().log(BasicLevel.DEBUG,
283: "Thread " + threadId + " expect: " + expected);
284: Collection found = new ArrayList();
285: for (Iterator iter = c.iterator(); iter.hasNext();) {
286: ba = (BasicA) iter.next();
287: int f2 = ba.readF2();
288: found.add(new Integer(f2));
289: }
290: q.closeAll();
291: st.getLogger().log(BasicLevel.DEBUG,
292: "Thread " + threadId + " found: " + found);
293: st.assertSameCollection("Bad query result" + ", threadId="
294: + threadId + ", min=" + min + ", max=" + max,
295: expected, found);
296: }
297: }
298: }
|