001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jca.adapter;
023:
024: import java.io.PrintWriter;
025: import java.util.ArrayList;
026: import java.util.Collection;
027: import java.util.HashSet;
028: import java.util.Iterator;
029: import java.util.Map;
030:
031: import javax.resource.ResourceException;
032: import javax.resource.spi.ConnectionEvent;
033: import javax.resource.spi.ConnectionEventListener;
034: import javax.resource.spi.ConnectionRequestInfo;
035: import javax.resource.spi.LocalTransaction;
036: import javax.resource.spi.ManagedConnection;
037: import javax.resource.spi.ManagedConnectionMetaData;
038: import javax.resource.spi.ResourceAdapterInternalException;
039: import javax.security.auth.Subject;
040: import javax.transaction.xa.XAException;
041: import javax.transaction.xa.XAResource;
042: import javax.transaction.xa.Xid;
043:
044: import org.jboss.logging.Logger;
045: import org.jboss.tm.TxUtils;
046:
047: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
048:
049: /**
050: * TestManagedConnection.java
051: *
052: * @author <a href="mailto:d_jencks@users.sourceforge.net">David Jencks</a>
053: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
054: * @version <tt>$Revision: 57211 $</tt>
055: */
056: public class TestManagedConnection implements ManagedConnection,
057: XAResource, LocalTransaction {
058: public static final String STARTED = "STARTED";
059: public static final String SUSPENDED = "SUSPENDED";
060: public static final String ENDED = "ENDED";
061: public static final String PREPARED = "PREPARED";
062:
063: public static final String LOCAL_NONE = "LOCAL_NONE";
064: public static final String LOCAL_TRANSACTION = "LOCAL_TRANSACTION";
065: public static final String LOCAL_COMMITTED = "LOCAL_COMMITTED";
066: public static final String LOCAL_ROLLEDBACK = "LOCAL_ROLLEDBACK";
067:
068: private final int id;
069:
070: private Logger log = Logger.getLogger(getClass());
071: private TestManagedConnectionFactory mcf;
072: private HashSet handles = new HashSet();
073: private HashSet listeners = new HashSet();
074:
075: private GlobalXID currentXid;
076:
077: private SynchronizedBoolean destroyed = new SynchronizedBoolean(
078: false);
079:
080: private boolean failInPrepare = false;
081: private boolean failInCommit = false;
082: private static boolean failInStart = false;
083: private static boolean failInEnd = false;
084:
085: private static int xaCode;
086:
087: private String localState = LOCAL_NONE;
088:
089: public static void setFailInEnd(boolean fie, int xa) {
090: failInEnd = fie;
091: xaCode = xa;
092:
093: }
094:
095: public static void setFailInStart(boolean fis, int xa) {
096: failInStart = fis;
097: xaCode = xa;
098: }
099:
100: public TestManagedConnection(
101: final TestManagedConnectionFactory mcf,
102: final Subject subject, final TestConnectionRequestInfo cri,
103: final int id) {
104: this .mcf = mcf;
105: this .id = id;
106: }
107:
108: void setFailInPrepare(final boolean fail, final int xaCode) {
109: this .failInPrepare = fail;
110: this .xaCode = xaCode;
111: }
112:
113: void setFailInCommit(final boolean fail, final int xaCode) {
114: this .failInCommit = fail;
115: this .xaCode = xaCode;
116: }
117:
118: // implementation of javax.resource.spi.ManagedConnection interface
119:
120: public synchronized void destroy() throws ResourceException {
121: log.info("Destroying connection: " + this );
122: if (destroyed.get())
123: return;
124: cleanup();
125: destroyed.set(true);
126: currentXid = null;
127: }
128:
129: public synchronized void cleanup() throws ResourceException {
130: log.info("cleanup: " + this + " handles=" + handles);
131:
132: checkDestroyedResourceException();
133: for (Iterator i = handles.iterator(); i.hasNext();) {
134: TestConnection c = (TestConnection) i.next();
135: c.setMc(null);
136: i.remove();
137: }
138: }
139:
140: public synchronized Object getConnection(Subject param1,
141: ConnectionRequestInfo param2) throws ResourceException {
142: log.info("getConnection " + this );
143:
144: checkDestroyedResourceException();
145:
146: if (param2 != null
147: && ((TestConnectionRequestInfo) param2).failure
148: .equals("getConnectionResource"))
149: throw new ResourceException(this .toString());
150: if (param2 != null
151: && ((TestConnectionRequestInfo) param2).failure
152: .equals("getConnectionRuntime"))
153: throw new RuntimeException(this .toString());
154: TestConnection c = new TestConnection(this );
155: handles.add(c);
156: return c;
157: }
158:
159: public synchronized void associateConnection(Object p)
160: throws ResourceException {
161: log.info("associateConnecton " + this + " connection=" + p);
162:
163: checkDestroyedResourceException();
164:
165: if (p instanceof TestConnection) {
166: ((TestConnection) p).setMc(this );
167: handles.add(p);
168: } else {
169: throw new ResourceException("wrong kind of Connection " + p);
170: }
171: }
172:
173: public synchronized void addConnectionEventListener(
174: ConnectionEventListener cel) {
175: log.info("addCEL: " + this + " " + cel);
176: listeners.add(cel);
177: }
178:
179: public synchronized void removeConnectionEventListener(
180: ConnectionEventListener cel) {
181: log.info("removeCEL: " + this + " " + cel);
182: listeners.remove(cel);
183: }
184:
185: public synchronized XAResource getXAResource()
186: throws ResourceException {
187: checkDestroyedResourceException();
188: return this ;
189: }
190:
191: public LocalTransaction getLocalTransaction()
192: throws ResourceException {
193: return this ;
194: }
195:
196: public ManagedConnectionMetaData getMetaData()
197: throws ResourceException {
198: return null;
199: }
200:
201: public void setLogWriter(PrintWriter param1)
202: throws ResourceException {
203: }
204:
205: public PrintWriter getLogWriter() throws ResourceException {
206: return null;
207: }
208:
209: // implementation of javax.transaction.xa.XAResource interface
210:
211: public void start(Xid xid, int flags) throws XAException {
212: long sleepInStart = mcf.getSleepInStart();
213: if (flags == TMNOFLAGS && sleepInStart != 0)
214: doSleep(sleepInStart);
215:
216: synchronized (this ) {
217: if (failInStart) {
218: XAException xaex = new XAException(xaCode + "for"
219: + this );
220: broadcastConnectionError(xaex);
221: throw new XAException(xaCode + "for" + this );
222: }
223:
224: GlobalXID gid = new GlobalXID(xid);
225: String flagString = TxUtils
226: .getXAResourceFlagsAsString(flags);
227: log.info("start with xid=" + gid + " flags=" + flagString
228: + " for " + this );
229: checkDestroyedXAException();
230: Map xids = getXids();
231: synchronized (xids) {
232: String state = (String) xids.get(gid);
233: if (state == null && flags != TMNOFLAGS)
234: throw new XAException("Invalid start state="
235: + state + " xid=" + gid + " flags="
236: + flagString + " for " + this );
237: if (state != null
238: && state != SUSPENDED
239: && state != ENDED
240: && (state != STARTED || ((flags & TMJOIN) == 0))
241: && (state != STARTED || ((flags & TMRESUME) == 0)))
242: throw new XAException("Invalid start state="
243: + state + " xid=" + gid + " flags="
244: + flagString + " for " + this );
245: if ((flags & TMJOIN) != 0 && mcf.failJoin)
246: throw new XAException("Join is not allowed "
247: + state + " xid=" + gid + " flags="
248: + flagString + " for " + this );
249: xids.put(gid, STARTED);
250: }
251:
252: this .currentXid = gid;
253: }
254: }
255:
256: public void end(final Xid xid, final int flags) throws XAException {
257: if (failInEnd) {
258: XAException xaex = new XAException(xaCode + "for" + this );
259: broadcastConnectionError(xaex);
260: throw new XAException(xaCode + "for" + this );
261: }
262:
263: long sleepInEnd = mcf.getSleepInEnd();
264: if (flags != TMSUCCESS && sleepInEnd != 0)
265: doSleep(sleepInEnd);
266:
267: synchronized (this ) {
268: GlobalXID gid = new GlobalXID(xid);
269: String flagString = TxUtils
270: .getXAResourceFlagsAsString(flags);
271: log.info("end with xid=" + gid + " flags=" + flagString
272: + " for " + this );
273: checkDestroyedXAException();
274: Map xids = getXids();
275: synchronized (xids) {
276: String state = (String) xids.get(gid);
277: if (state != STARTED && state != SUSPENDED
278: && state != ENDED)
279: throw new XAException("Invalid end state=" + state
280: + " xid=" + gid + " " + this );
281: if ((flags & TMSUSPEND) == 0)
282: xids.put(gid, ENDED);
283: else
284: xids.put(gid, SUSPENDED);
285: }
286:
287: this .currentXid = null;
288: }
289: }
290:
291: public synchronized void commit(Xid xid, boolean onePhase)
292: throws XAException {
293: GlobalXID gid = new GlobalXID(xid);
294: log.info("commit with xid=" + gid + " onePhase=" + onePhase
295: + " for " + this );
296: checkDestroyedXAException();
297: if (failInCommit)
298: throw new XAException(xaCode + " for " + this );
299: Map xids = getXids();
300: synchronized (xids) {
301: String state = (String) xids.get(gid);
302: if (onePhase) {
303: if (state != SUSPENDED && state != ENDED)
304: throw new XAException(
305: "Invalid one phase commit state=" + state
306: + " xid=" + gid + " " + this );
307: } else {
308: if (state != PREPARED)
309: throw new XAException(
310: "Invalid two phase commit state=" + state
311: + " xid=" + gid + " " + this );
312: }
313: xids.remove(gid);
314: }
315: }
316:
317: public synchronized void rollback(Xid xid) throws XAException {
318: GlobalXID gid = new GlobalXID(xid);
319: log.info("rollback with xid=" + gid + " for " + this );
320: checkDestroyedXAException();
321: Map xids = getXids();
322: synchronized (xids) {
323: String state = (String) xids.get(gid);
324: if (state != SUSPENDED && state != ENDED
325: && state != PREPARED)
326: throw new XAException("Invalid rollback state=" + state
327: + " xid=" + gid + " " + this );
328: xids.remove(gid);
329: }
330: }
331:
332: public synchronized int prepare(Xid xid) throws XAException {
333: GlobalXID gid = new GlobalXID(xid);
334: log.info("prepare with xid=" + gid + " for " + this );
335: checkDestroyedXAException();
336: Map xids = getXids();
337: synchronized (xids) {
338: String state = (String) xids.get(gid);
339: if (state != SUSPENDED && state != ENDED)
340: throw new XAException("Invalid prepare state=" + state
341: + " xid=" + gid + " " + this );
342: if (failInPrepare)
343: throw new XAException(xaCode + " for " + this );
344: xids.put(gid, PREPARED);
345: return XA_OK;
346: }
347: }
348:
349: public synchronized void forget(Xid xid) throws XAException {
350: GlobalXID gid = new GlobalXID(xid);
351: log.info("forget with xid=" + gid + " for " + this );
352: checkDestroyedXAException();
353: Map xids = getXids();
354: synchronized (xids) {
355: xids.remove(gid);
356: }
357: }
358:
359: public Xid[] recover(int param1) throws XAException {
360: return null;
361: }
362:
363: public boolean isSameRM(XAResource xar) throws XAException {
364: if (xar == null
365: || xar instanceof TestManagedConnection == false)
366: return false;
367: TestManagedConnection other = (TestManagedConnection) xar;
368: return (mcf == other.mcf);
369: }
370:
371: public int getTransactionTimeout() throws XAException {
372: return 0;
373: }
374:
375: public boolean setTransactionTimeout(int param1) throws XAException {
376: return false;
377: }
378:
379: public String getLocalState() {
380: return localState;
381: }
382:
383: public void begin() throws ResourceException {
384: localState = LOCAL_TRANSACTION;
385: }
386:
387: public void sendBegin() throws ResourceException {
388: begin();
389: ConnectionEvent event = new ConnectionEvent(this ,
390: ConnectionEvent.LOCAL_TRANSACTION_STARTED);
391: Collection copy = new ArrayList(listeners);
392: for (Iterator i = copy.iterator(); i.hasNext();) {
393: ConnectionEventListener cel = (ConnectionEventListener) i
394: .next();
395: try {
396: cel.localTransactionStarted(event);
397: } catch (Throwable ignored) {
398: log.warn("Ignored", ignored);
399: }
400: }
401: }
402:
403: public void commit() throws ResourceException {
404: localState = LOCAL_COMMITTED;
405: }
406:
407: public void sendCommit() throws ResourceException {
408: commit();
409:
410: ConnectionEvent event = new ConnectionEvent(this ,
411: ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
412: Collection copy = new ArrayList(listeners);
413: for (Iterator i = copy.iterator(); i.hasNext();) {
414: ConnectionEventListener cel = (ConnectionEventListener) i
415: .next();
416: try {
417: cel.localTransactionCommitted(event);
418: } catch (Throwable ignored) {
419: log.warn("Ignored", ignored);
420: }
421: }
422: }
423:
424: public void rollback() throws ResourceException {
425: localState = LOCAL_ROLLEDBACK;
426: }
427:
428: public void sendRollback() throws ResourceException {
429: rollback();
430:
431: ConnectionEvent event = new ConnectionEvent(this ,
432: ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
433: Collection copy = new ArrayList(listeners);
434: for (Iterator i = copy.iterator(); i.hasNext();) {
435: ConnectionEventListener cel = (ConnectionEventListener) i
436: .next();
437: try {
438: cel.localTransactionRolledback(event);
439: } catch (Throwable ignored) {
440: log.warn("Ignored", ignored);
441: }
442: }
443: }
444:
445: synchronized boolean isInTx() {
446: log.info("isInTx: " + this );
447: return currentXid != null;
448: }
449:
450: Map getXids() {
451: return mcf.getXids();
452: }
453:
454: void connectionClosed(TestConnection handle) {
455: if (destroyed.get())
456: return;
457:
458: log.info("Connetion closed handle=" + handle + " for " + this );
459:
460: ConnectionEvent ce = new ConnectionEvent(this ,
461: ConnectionEvent.CONNECTION_CLOSED);
462: ce.setConnectionHandle(handle);
463: Collection copy = new ArrayList(listeners);
464: for (Iterator i = copy.iterator(); i.hasNext();) {
465: log.info("notifying 1 cel connectionClosed");
466: ConnectionEventListener cel = (ConnectionEventListener) i
467: .next();
468: try {
469: cel.connectionClosed(ce);
470: } catch (Throwable ignored) {
471: log.warn("Ignored", ignored);
472: }
473: }
474: synchronized (this ) {
475: handles.remove(handle);
476: }
477: }
478:
479: protected void broadcastConnectionError(Throwable e) {
480: if (destroyed.get())
481: return;
482:
483: Exception ex = null;
484: if (e instanceof Exception)
485: ex = (Exception) e;
486: else
487: ex = new ResourceAdapterInternalException(
488: "Unexpected error", e);
489: ConnectionEvent ce = new ConnectionEvent(this ,
490: ConnectionEvent.CONNECTION_ERROR_OCCURRED, ex);
491: Collection copy = null;
492: synchronized (listeners) {
493: copy = new ArrayList(listeners);
494: }
495: for (Iterator i = copy.iterator(); i.hasNext();) {
496: ConnectionEventListener cel = (ConnectionEventListener) i
497: .next();
498: try {
499: cel.connectionErrorOccurred(ce);
500: } catch (Throwable t) {
501: }
502: }
503: }
504:
505: void connectionError(TestConnection handle, Exception e) {
506: if (destroyed.get())
507: return;
508:
509: log
510: .info("Connetion error handle=" + handle + " for "
511: + this , e);
512:
513: ConnectionEvent ce = new ConnectionEvent(this ,
514: ConnectionEvent.CONNECTION_ERROR_OCCURRED, e);
515: ce.setConnectionHandle(handle);
516: Collection copy = new ArrayList(listeners);
517: for (Iterator i = copy.iterator(); i.hasNext();) {
518: ConnectionEventListener cel = (ConnectionEventListener) i
519: .next();
520: try {
521: cel.connectionErrorOccurred(ce);
522: } catch (Throwable ignored) {
523: }
524: }
525: }
526:
527: void checkDestroyedResourceException() throws ResourceException {
528: if (destroyed.get())
529: throw new ResourceException("Already destroyed " + this );
530: }
531:
532: void checkDestroyedXAException() throws XAException {
533: if (destroyed.get())
534: throw new XAException("Already destroyed " + this );
535: }
536:
537: public synchronized String toString() {
538: StringBuffer buffer = new StringBuffer();
539: buffer.append("TestManagedConnection#").append(id);
540: buffer.append("{");
541: buffer.append("xid=").append(currentXid);
542: buffer.append(" destroyed=").append(destroyed.get());
543: buffer.append("}");
544: return buffer.toString();
545: }
546:
547: public void doSleep(long sleep) {
548: boolean interrupted = false;
549: try {
550: Thread.sleep(sleep);
551: } catch (InterruptedException e) {
552: interrupted = true;
553: }
554: if (interrupted)
555: Thread.currentThread().interrupt();
556: }
557:
558: public class GlobalXID {
559: byte[] gid;
560: int hashCode;
561: String toString;
562:
563: public GlobalXID(Xid xid) {
564: gid = xid.getGlobalTransactionId();
565:
566: for (int i = 0; i < gid.length; ++i)
567: hashCode += 37 * gid[i];
568: toString = new String(gid).trim();
569: }
570:
571: public int hashCode() {
572: return hashCode;
573: }
574:
575: public String toString() {
576: return toString;
577: }
578:
579: public boolean equals(Object obj) {
580: if (obj == null || obj instanceof GlobalXID == false)
581: return false;
582: GlobalXID other = (GlobalXID) obj;
583: return toString.equals(other.toString);
584: }
585: }
586: }
|