001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.pm;
023:
024: import java.util.Iterator;
025: import java.util.Map;
026: import java.util.Set;
027:
028: import javax.jms.JMSException;
029: import javax.transaction.xa.Xid;
030:
031: import org.jboss.logging.Logger;
032: import org.jboss.mq.ConnectionToken;
033: import org.jboss.mq.Recoverable;
034: import org.jboss.mq.SpyJMSException;
035:
036: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
037: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
038:
039: /**
040: * This class allows provides the base for user supplied persistence packages.
041: *
042: * @author Hiram Chirino (Cojonudo14@hotmail.com)
043: * @author Paul Kendall (paul.kendall@orion.co.nz)
044: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
045: * @version $Revision: 57198 $
046: */
047: public class TxManager implements Recoverable {
048: /** The log */
049: private static final Logger log = Logger.getLogger(TxManager.class);
050:
051: /** The persistence manager */
052: PersistenceManager persistenceManager;
053:
054: /** Maps Global transactions to local transactions */
055: ConcurrentHashMap globalToLocal = new ConcurrentHashMap();
056:
057: /** Prepared Transactions Map<Xid, PreparedInfo<Tx>> */
058: ConcurrentHashMap prepared = new ConcurrentHashMap();
059:
060: /**
061: * Create a new TxManager
062: *
063: * @param pm the persistence manager
064: */
065: public TxManager(PersistenceManager pm) {
066: persistenceManager = pm;
067: }
068:
069: /**
070: * Return the local transaction id for a distributed transaction id.
071: *
072: * @deprecated
073: * @param dc the connection
074: * @param xid the transaction id
075: * @return The Prepared transaction
076: * @exception javax.jms.JMSException Description of Exception
077: */
078: public final Tx getPrepared(ConnectionToken dc, Object xid)
079: throws JMSException {
080: GlobalXID gxid = new GlobalXID(dc, xid);
081: Tx txid = (Tx) globalToLocal.get(gxid);
082: if (txid == null)
083: throw new SpyJMSException(
084: "Transaction does not exist from: "
085: + dc.getClientID() + " xid=" + xid);
086:
087: return txid;
088: }
089:
090: /**
091: * Create and return a unique transaction id.
092: *
093: * @return the transaction id
094: * @exception JMSException for any error
095: */
096: public final Tx createTx() throws JMSException {
097: Tx txId = persistenceManager.createPersistentTx();
098: return txId;
099: }
100:
101: /**
102: * Commit the transaction to the persistent store.
103: *
104: * @param txId the transaction
105: * @exception JMSException for any error
106: */
107: public final void commitTx(Tx txId) throws JMSException {
108: boolean trace = log.isTraceEnabled();
109: if (trace)
110: log.trace("Commit branch=" + txId.longValue());
111: txId.commit(persistenceManager);
112: }
113:
114: /**
115: * Commit the transaction to the persistent store.
116: *
117: * @param dc the connection token
118: * @param xid the transaction
119: * @exception JMSException for any error
120: */
121: public final void commitTx(ConnectionToken dc, Object xid)
122: throws JMSException {
123: boolean trace = log.isTraceEnabled();
124: GlobalXID gxid = new GlobalXID(dc, xid);
125: Tx txid = (Tx) globalToLocal.get(gxid);
126: if (txid == null) {
127: PreparedInfo preparedInfo = (PreparedInfo) prepared
128: .get(xid);
129: if (preparedInfo == null)
130: throw new SpyJMSException(
131: "Transaction does not exist from: "
132: + dc.getClientID() + " xid=" + xid);
133: Set txids = preparedInfo.getTxids();
134: for (Iterator i = txids.iterator(); i.hasNext();) {
135: txid = (Tx) i.next();
136: if (trace)
137: log.trace("Commit xid=" + xid + " branch="
138: + txid.longValue());
139: txid.commit(persistenceManager);
140: }
141: prepared.remove(xid);
142: } else {
143: if (trace)
144: log.trace("Commit xid=" + xid + " branch="
145: + txid.longValue());
146: txid.commit(persistenceManager);
147: }
148: }
149:
150: /**
151: * Add an operation for after a commit
152: *
153: * @param txId the transaction
154: * @param task the task
155: * @throws JMSException for any error
156: */
157: public void addPostCommitTask(Tx txId, Runnable task)
158: throws JMSException {
159: if (txId == null) {
160: task.run();
161: return;
162: }
163:
164: txId.addPostCommitTask(task);
165: }
166:
167: /**
168: * Rollback the transaction.
169: *
170: * @param txId the transaction
171: * @exception JMSException for any error
172: */
173: public void rollbackTx(Tx txId) throws JMSException {
174: boolean trace = log.isTraceEnabled();
175: if (trace)
176: log.trace("Rollback branch=" + txId.longValue());
177: txId.rollback(persistenceManager);
178: }
179:
180: /**
181: * Rollback the transaction
182: *
183: * @param dc the connection token
184: * @param xid the transaction
185: * @exception JMSException for any error
186: */
187: public final void rollbackTx(ConnectionToken dc, Object xid)
188: throws JMSException {
189: boolean trace = log.isTraceEnabled();
190: GlobalXID gxid = new GlobalXID(dc, xid);
191: Tx txid = (Tx) globalToLocal.get(gxid);
192: if (txid == null) {
193: PreparedInfo preparedInfo = (PreparedInfo) prepared
194: .get(xid);
195: if (preparedInfo == null)
196: throw new SpyJMSException(
197: "Transaction does not exist from: "
198: + dc.getClientID() + " xid=" + xid);
199: Set txids = preparedInfo.getTxids();
200: for (Iterator i = txids.iterator(); i.hasNext();) {
201: txid = (Tx) i.next();
202: if (trace)
203: log.trace("Rolling back xid=" + xid + " branch="
204: + txid.longValue());
205: txid.rollback(persistenceManager);
206: }
207: prepared.remove(xid);
208: } else {
209: if (trace)
210: log.trace("Rolling back xid=" + xid + " branch="
211: + txid.longValue());
212: txid.rollback(persistenceManager);
213: }
214: }
215:
216: /**
217: * Add an operation for after a rollback
218: *
219: * @param txId the transaction
220: * @param task the task
221: * @throws JMSException for any error
222: */
223: public void addPostRollbackTask(Tx txId, Runnable task)
224: throws JMSException {
225: if (txId == null)
226: return;
227:
228: txId.addPostRollbackTask(task);
229: }
230:
231: /**
232: * Create and return a unique transaction id. Given a distributed connection
233: * and a transaction id object, allocate a unique local transaction id if
234: * the remote id is not already known.
235: *
236: * @param dc the connection token
237: * @param xid the xid
238: * @return the transaction
239: * @exception JMSException for any error
240: */
241: public Tx createTx(ConnectionToken dc, Object xid)
242: throws JMSException {
243: GlobalXID gxid = new GlobalXID(dc, xid);
244: if (globalToLocal.containsKey(gxid))
245: throw new SpyJMSException("Duplicate transaction from: "
246: + dc.getClientID() + " xid=" + xid);
247:
248: Tx txId = createTx();
249: if (xid != null && xid instanceof Xid)
250: txId.setXid((Xid) xid);
251: globalToLocal.put(gxid, txId);
252:
253: //Tasks to remove the global to local mappings on commit/rollback
254: txId.addPostCommitTask(gxid);
255: txId.addPostRollbackTask(gxid);
256:
257: return txId;
258: }
259:
260: /**
261: * Restore a prepared transaction
262: *
263: * @param txId the transaction id
264: * @throws JMSException for any error
265: */
266: public void restoreTx(Tx txId) throws JMSException {
267: addPreparedTx(txId, txId.getXid(), true);
268: }
269:
270: /**
271: * Add a prepared transactions
272: *
273: * @param txId the transaction id
274: * @param xid the xid
275: * @param inDoubt whether it is in doubt
276: * @throws JMSException for any error
277: */
278: void addPreparedTx(Tx txId, Xid xid, boolean inDoubt)
279: throws JMSException {
280: PreparedInfo preparedInfo = (PreparedInfo) prepared.get(xid);
281: if (preparedInfo == null) {
282: preparedInfo = new PreparedInfo(xid, false);
283: prepared.put(xid, preparedInfo);
284: }
285: preparedInfo.add(txId);
286: if (inDoubt)
287: preparedInfo.setInDoubt(true);
288: }
289:
290: /**
291: * Mark the transaction branch as prepared
292: *
293: * @param dc the connection token
294: * @param xid the xid
295: * @param txId the transaction
296: * @throws JMSException for any error
297: */
298: public void markPrepared(ConnectionToken dc, Object xid, Tx txId)
299: throws JMSException {
300: try {
301: if (xid instanceof Xid)
302: addPreparedTx(txId, (Xid) xid, false);
303: } catch (Throwable t) {
304: SpyJMSException.rethrowAsJMSException(
305: "Error marking transaction as prepared xid=" + xid
306: + " tx=" + txId, t);
307: }
308: }
309:
310: public Xid[] recover(ConnectionToken dc, int flags)
311: throws Exception {
312: Set preparedXids = prepared.keySet();
313: Xid[] xids = (Xid[]) preparedXids.toArray(new Xid[preparedXids
314: .size()]);
315: return xids;
316: }
317:
318: /**
319: * Get the prepared transactions
320: *
321: * @return
322: */
323: public Map getPreparedTransactions() {
324: return prepared;
325: }
326:
327: /**
328: * A global transaction
329: */
330: class GlobalXID implements Runnable {
331: ConnectionToken dc;
332: Object xid;
333:
334: GlobalXID(ConnectionToken dc, Object xid) {
335: this .dc = dc;
336: this .xid = xid;
337: }
338:
339: public boolean equals(Object obj) {
340: if (obj == null) {
341: return false;
342: }
343: if (obj.getClass() != GlobalXID.class) {
344: return false;
345: }
346: return ((GlobalXID) obj).xid.equals(xid)
347: && ((GlobalXID) obj).dc.equals(dc);
348: }
349:
350: public int hashCode() {
351: return xid.hashCode();
352: }
353:
354: public void run() {
355: Tx txId = (Tx) globalToLocal.remove(this );
356: // Tidyup the prepared transactions
357: if (txId != null) {
358: PreparedInfo preparedInfo = (PreparedInfo) prepared
359: .get(xid);
360: if (preparedInfo != null) {
361: preparedInfo.remove(txId);
362: if (preparedInfo.isEmpty())
363: prepared.remove(xid);
364: }
365: }
366: }
367: }
368:
369: /**
370: * Information about a prepared global transaction
371: *
372: * @author <a href="adrian@jboss.com">Adrian Brock</a>
373: * @version $Revision: 57198 $
374: */
375: public static class PreparedInfo {
376: /** The XID */
377: private Xid xid;
378:
379: /** Whether the transaction is in doubt */
380: private boolean inDoubt;
381:
382: /** The local transaction branches */
383: private Set txids = new CopyOnWriteArraySet();
384:
385: /**
386: * Create a new PreparedInfo.
387: *
388: * @param xid the xid
389: * @param indoubt whether the transaction is in doubt
390: */
391: public PreparedInfo(Xid xid, boolean inDoubt) {
392: this .xid = xid;
393: this .inDoubt = inDoubt;
394: }
395:
396: /**
397: * Whether the transaction is in doubt
398: *
399: * @return true when in doubt
400: */
401: public boolean isInDoubt() {
402: return inDoubt;
403: }
404:
405: /**
406: * Set the in doubt
407: *
408: * @param inDoubt the in doubt value
409: */
410: public void setInDoubt(boolean inDoubt) {
411: this .inDoubt = inDoubt;
412: }
413:
414: /**
415: * Get the XID
416: *
417: * @return
418: */
419: public Xid getXid() {
420: return xid;
421: }
422:
423: /**
424: * Get the local branches
425: *
426: * @return the local branches
427: */
428: public Set getTxids() {
429: return txids;
430: }
431:
432: /**
433: * Add a local branch
434: *
435: * @param txid the local branch
436: */
437: public void add(Tx txid) {
438: txids.add(txid);
439: }
440:
441: /**
442: * Remove a local branch
443: *
444: * @param txid the local branch
445: */
446: public void remove(Tx txid) {
447: txids.remove(txid);
448: }
449:
450: /**
451: * Whether there are no local branches
452: *
453: * @return true when there no local branches
454: */
455: public boolean isEmpty() {
456: return txids.isEmpty();
457: }
458: }
459: }
|