001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jca.mbean;
023:
024: import java.io.Serializable;
025: import java.util.Collections;
026: import java.util.HashMap;
027: import java.util.HashSet;
028: import java.util.Map;
029: import java.util.Set;
030:
031: import javax.naming.InitialContext;
032: import javax.resource.cci.Connection;
033: import javax.resource.cci.ConnectionFactory;
034: import javax.sql.DataSource;
035: import javax.transaction.Synchronization;
036: import javax.transaction.Transaction;
037: import javax.transaction.TransactionManager;
038:
039: import org.jboss.logging.Logger;
040: import org.jboss.tm.TxUtils;
041:
042: /**
043: * MultiThreaded Operations that can be executed concurrently.
044: *
045: * Based on Operation class.
046: *
047: * @author <a href="dimitris@jboss.org">Dimitris Andreadis</a>
048: * @version $Revision: 57211 $
049: */
050: public class MTOperation implements Serializable {
051: // Static Data ---------------------------------------------------
052:
053: /** The serialVersionUID */
054: private static final long serialVersionUID = 1L;
055:
056: /** Available Operations */
057: public static final int TM_GET_STATUS = 0;
058: public static final int TM_BEGIN = 1;
059: public static final int TM_SUSPEND = 2;
060: public static final int TM_RESUME = 3;
061: public static final int TM_COMMIT = 4;
062: public static final int TX_COMMIT = 5;
063: public static final int TX_REGISTER_SYNC = 6;
064:
065: public static final int CF_LOOKUP = 10;
066: public static final int CF_BY_TX_LOOKUP = 11;
067: public static final int CF_GET_CONN = 12;
068: public static final int CN_CLOSE_CONN = 13;
069:
070: public static final int DS_TEST_LOOKUP = 15;
071: public static final int DS_DEFAULT_LOOKUP = 16;
072: public static final int DS_GET_CONN = 17;
073: public static final int DS_CLOSE_CONN = 18;
074:
075: public static final int XX_SLEEP_200 = 20;
076: public static final int XX_SLEEP_RANDOM = 21;
077: public static final int XX_POST_SIGNAL = 22;
078: public static final int XX_WAIT_FOR_SIGNAL = 23;
079: public static final int XX_WAIT_FOR_TX = 24;
080: public static final int XX_WAIT_FOR_CONN = 25;
081:
082: /** The Logger */
083: protected static Logger log;
084:
085: /** TM instance */
086: protected static TransactionManager tm = null;
087:
088: /** Shared connections */
089: protected static Map connections = Collections
090: .synchronizedMap(new HashMap());
091:
092: /** Active Transactions */
093: protected static Map transactions = Collections
094: .synchronizedMap(new HashMap());
095:
096: /** Used for signaling between threads */
097: protected static Set signals = Collections
098: .synchronizedSet(new HashSet());
099:
100: /** Shared reference to a connection factory */
101: protected static ConnectionFactory cf = null;
102:
103: /**Shared reference to a DataSource */
104: protected static DataSource ds = null;
105:
106: /** Set when the first unexpected throwable is encounter in any thread */
107: protected static boolean testMarkedForExit;
108:
109: // Protected Data ------------------------------------------------
110:
111: /** An id for this transaction */
112: protected Integer id;
113:
114: /** The operation to execute */
115: protected int op;
116:
117: /** Set when an exception is expected */
118: protected Throwable throwable;
119:
120: // Static Methods ------------------------------------------------
121:
122: /**
123: * Setup static objects for the test
124: */
125: public static void init(Logger log) throws Exception {
126: MTOperation.log = log;
127:
128: if (getTM().getTransaction() != null) {
129: throw new IllegalStateException(
130: "Invalid thread association "
131: + getTM().getTransaction());
132: }
133: connections.clear();
134: transactions.clear();
135: signals.clear();
136:
137: // clear the exit flag
138: setTestMarkedForExit(false);
139: }
140:
141: /**
142: * Lazy TransactionManager lookup
143: */
144: public static TransactionManager getTM() throws Exception {
145: if (tm == null) {
146: tm = (TransactionManager) new InitialContext()
147: .lookup("java:/TransactionManager");
148: }
149: return tm;
150: }
151:
152: /**
153: * Cleanup
154: */
155: public static void destroy() {
156: connections.clear();
157: transactions.clear();
158: signals.clear();
159: }
160:
161: /**
162: * Returns true if the test is marked for exit
163: */
164: public static boolean isTestMarkedForExit() {
165: return testMarkedForExit;
166: }
167:
168: /**
169: * Tell the threads to exit
170: */
171: public static void setTestMarkedForExit(boolean testMarkedForExit) {
172: MTOperation.testMarkedForExit = testMarkedForExit;
173: }
174:
175: /**
176: * Used by waiting threads to stop execution
177: *
178: * @throws Exception if the test if marked to exit
179: */
180: public static void checkTestMarkedForExit() throws Exception {
181: if (testMarkedForExit) {
182: throw new MarkedForExitException();
183: }
184: }
185:
186: /**
187: * Exception used for early existing
188: */
189: private static class MarkedForExitException extends Exception {
190: // empty
191: }
192:
193: // Constructors --------------------------------------------------
194:
195: public MTOperation(int op) {
196: this (op, 0);
197: }
198:
199: public MTOperation(int op, int id) {
200: this .id = new Integer(id);
201: this .op = op;
202: }
203:
204: public MTOperation(int op, int id, Throwable throwable) {
205: this .id = new Integer(id);
206: this .op = op;
207: this .throwable = throwable;
208: }
209:
210: // Public Methods ------------------------------------------------
211:
212: public void perform() throws Exception {
213: Throwable caught = null;
214: try {
215: switch (op) {
216: case TM_GET_STATUS:
217: tmGetStatus();
218: break;
219:
220: case TM_BEGIN:
221: tmBegin();
222: break;
223:
224: case TM_SUSPEND:
225: tmSuspend();
226: break;
227:
228: case TM_RESUME:
229: tmResume();
230: break;
231:
232: case TM_COMMIT:
233: tmCommit();
234: break;
235:
236: case TX_COMMIT:
237: txCommit();
238: break;
239:
240: case TX_REGISTER_SYNC:
241: txRegisterSync();
242: break;
243:
244: case XX_SLEEP_200:
245: xxSleep200();
246: break;
247:
248: case XX_SLEEP_RANDOM:
249: xxSleepRandom();
250: break;
251:
252: case XX_POST_SIGNAL:
253: xxPostSignal();
254: break;
255:
256: case XX_WAIT_FOR_SIGNAL:
257: xxWaitForSignal();
258: break;
259:
260: case XX_WAIT_FOR_TX:
261: xxWaitForTx();
262: break;
263:
264: case XX_WAIT_FOR_CONN:
265: xxWaitForConn();
266: break;
267:
268: case CF_LOOKUP:
269: cfLookup();
270: break;
271:
272: case CF_BY_TX_LOOKUP:
273: cfByTxLookup();
274: break;
275:
276: case DS_TEST_LOOKUP:
277: dsTestLookup();
278: break;
279:
280: case DS_DEFAULT_LOOKUP:
281: dsDefaultLookup();
282: break;
283:
284: case DS_GET_CONN:
285: dsGetConn();
286: break;
287:
288: case DS_CLOSE_CONN:
289: dsCloseConn();
290: break;
291:
292: case CF_GET_CONN:
293: cfGetConn();
294: break;
295:
296: case CN_CLOSE_CONN:
297: cnCloseConn();
298: break;
299:
300: default:
301: throw new IllegalArgumentException("Invalid operation "
302: + op);
303: }
304: } catch (MarkedForExitException e) {
305: log.info(tid() + "Early exit");
306: return;
307: } catch (Throwable t) {
308: caught = t;
309: }
310:
311: // expected an exception but caught none
312: if (throwable != null && caught == null) {
313: setTestMarkedForExit(true);
314: throw new Exception("Expected throwable ", throwable);
315: }
316:
317: // expected an exception but caught the wrong one
318: if (throwable != null
319: && (throwable.getClass().isAssignableFrom(caught
320: .getClass())) == false) {
321: log.warn("Caught wrong throwable", caught);
322: setTestMarkedForExit(true);
323: throw new Exception("Expected throwable " + throwable
324: + " caught ", caught);
325: }
326:
327: // did not expect an exception bug caught one
328: if (throwable == null && caught != null) {
329: log.warn("Caught unexpected throwable", caught);
330: setTestMarkedForExit(true);
331: throw new Exception("Unexpected throwable ", caught);
332: }
333: }
334:
335: public void cfLookup() throws Exception {
336: log.info(tid() + " CF_LOOKUP");
337: InitialContext ctx = new InitialContext();
338: cf = (ConnectionFactory) ctx.lookup("java:JBossTestCF");
339: }
340:
341: public void cfByTxLookup() throws Exception {
342: log.info(tid() + " CF_BY_TX_LOOKUP");
343: InitialContext ctx = new InitialContext();
344: cf = (ConnectionFactory) ctx.lookup("java:JBossTestCFByTx");
345: }
346:
347: public void cfGetConn() throws Exception {
348: log.info(tid() + " CF_GET_CONN (" + id + ")");
349: Connection conn = cf.getConnection();
350: connections.put(id, conn);
351: }
352:
353: public void cnCloseConn() throws Exception {
354: log.info(tid() + " CN_CLOSE_CONN (" + id + ")");
355: Connection conn = (Connection) connections.get(id);
356: conn.close();
357: }
358:
359: public void dsTestLookup() throws Exception {
360: log.info(tid() + " DS_TEST_LOOKUP");
361: InitialContext ctx = new InitialContext();
362: ds = (DataSource) ctx.lookup("java:StatementTestsConnectionDS");
363:
364: }
365:
366: public void dsDefaultLookup() throws Exception {
367: log.info(tid() + " DS_DEFAULT_LOOKUP");
368: InitialContext ctx = new InitialContext();
369: ds = (DataSource) ctx.lookup("java:DefaultDS");
370: }
371:
372: public void dsGetConn() throws Exception {
373: log.info(tid() + " DS_GET_CONN (" + id + ")");
374: java.sql.Connection conn = ds.getConnection();
375: connections.put(id, conn);
376: }
377:
378: public void dsCloseConn() throws Exception {
379: log.info(tid() + " DS_CLOSE_CONN (" + id + ")");
380: java.sql.Connection conn = (java.sql.Connection) connections
381: .get(id);
382: conn.close();
383: }
384:
385: public void tmGetStatus() throws Exception {
386: log.info(tid() + " "
387: + TxUtils.getStatusAsString(getTM().getStatus()));
388: }
389:
390: public void tmBegin() throws Exception {
391: log.info(tid() + " TM_BEGIN (" + id + ")");
392: getTM().begin();
393: Transaction tx = getTM().getTransaction();
394: synchronized (transactions) {
395: transactions.put(id, tx);
396: transactions.notifyAll();
397: }
398: }
399:
400: public void tmSuspend() throws Exception {
401: log.info(tid() + " TM_SUSPEND (" + id + ")");
402: Transaction tx = getTM().suspend();
403: transactions.put(id, tx);
404: }
405:
406: public void tmResume() throws Exception {
407: log.info(tid() + " TM_RESUME (" + id + ")");
408: Transaction tx = (Transaction) transactions.get(id);
409: if (tx == null) {
410: throw new IllegalStateException("Tx not found:" + id);
411: } else {
412: getTM().resume(tx);
413: }
414: }
415:
416: public void tmCommit() throws Exception {
417: log.info(tid() + " TM_COMMIT");
418: getTM().commit();
419: }
420:
421: public void txCommit() throws Exception {
422: log.info(tid() + " TX_COMMIT (" + id + ")");
423: Transaction tx = (Transaction) transactions.get(id);
424: if (tx == null) {
425: throw new IllegalStateException("Tx not found: " + id);
426: } else {
427: tx.commit();
428: }
429: }
430:
431: public void txRegisterSync() throws Exception {
432: log.info(tid() + " TX_REGISTER_SYNC (" + id + ")");
433: Transaction tx = (Transaction) transactions.get(id);
434: if (tx == null) {
435: throw new IllegalStateException("Tx not found: " + id);
436: }
437: Synchronization sync = new Synchronization() {
438: public void beforeCompletion() {
439: log.info(tid() + " beforeCompletion() called");
440: }
441:
442: public void afterCompletion(int status) {
443: log.info(tid() + " afterCompletion("
444: + TxUtils.getStatusAsString(status)
445: + ") called");
446: }
447: };
448: tx.registerSynchronization(sync);
449: }
450:
451: public void xxWaitForTx() throws Exception {
452: log.info(tid() + " XX_WAIT_FOR_TX (" + id + ")");
453:
454: Transaction tx = (Transaction) transactions.get(id);
455: while (tx == null) {
456: checkTestMarkedForExit();
457:
458: log.info(tid() + " Sleeping for 100 msecs");
459:
460: synchronized (transactions) {
461: try {
462: transactions.wait(100);
463: } catch (InterruptedException ignore) {
464: }
465: }
466: tx = (Transaction) transactions.get(id);
467: }
468: log.info(tid() + " Got it");
469: }
470:
471: public void xxWaitForConn() throws Exception {
472: log.info(tid() + " XX_WAIT_FOR_CONN (" + id + ")");
473:
474: boolean contained = connections.containsKey(id);
475: while (contained == false) {
476: checkTestMarkedForExit();
477:
478: log.info(tid() + " Sleeping for 100 msecs");
479:
480: synchronized (connections) {
481: try {
482: connections.wait(100);
483: } catch (InterruptedException ignore) {
484: }
485: }
486: contained = connections.containsKey(id);
487: }
488: log.info(tid() + " Got it");
489: }
490:
491: public void xxSleep200() throws Exception {
492: log.info(tid() + " XX_SLEEP_200");
493: Thread.sleep(200);
494: }
495:
496: public void xxSleepRandom() throws Exception {
497: long random = Math.round((Math.random() * 100));
498: log.info(tid() + " XX_SLEEP_RANDOM (" + random + ")");
499: Thread.sleep(random);
500: }
501:
502: public void xxPostSignal() throws Exception {
503: log.info(tid() + " XX_POST_SIGNAL (" + id + ")");
504: synchronized (signals) {
505: signals.add(id);
506: signals.notifyAll();
507: }
508: }
509:
510: public void xxWaitForSignal() throws Exception {
511: log.info(tid() + " XX_WAIT_FOR_SIGNAL (" + id + ")");
512:
513: boolean posted = signals.contains(id);
514: while (posted == false) {
515: checkTestMarkedForExit();
516:
517: log.info(tid() + " Signal not posted, waiting...");
518:
519: synchronized (signals) {
520: try {
521: signals.wait(100);
522: } catch (InterruptedException ignore) {
523: }
524: }
525: posted = signals.contains(id);
526: }
527: log.info(tid() + " Got it!");
528: }
529:
530: private String tid() {
531: return Thread.currentThread().getName();
532: }
533:
534: }
|