001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence;
018:
019: import java.util.LinkedList;
020: import java.util.Iterator;
021: import java.util.Vector;
022: import java.sql.Connection;
023: import org.apache.log4j.Logger;
024: import java.util.HashSet;
025: import java.io.IOException;
026: import java.io.StringWriter;
027: import java.io.PrintWriter;
028:
029: /**
030: * This class keeps track of all transaction objects allocated.
031: * @author Brautigam Robert
032: * @version Revision: $Revision$
033: */
034: public class TransactionTracker {
035: public static int TX_REQUIRED = 1;
036: public static int TX_NEW = 2;
037: public static int TX_OPTIONAL = 3;
038:
039: private static Logger logger = Logger
040: .getLogger(TransactionTracker.class);
041:
042: StoreContext context;
043: private ThreadLocal transactions;
044: private LinkedList listeners;
045: private LinkedList allTransactions;
046: private HashSet allTransactionIds;
047: private Store store;
048: private ThreadLocal runningListeners;
049:
050: TransactionTracker(StoreContext context) {
051: this .context = context;
052: transactions = new ThreadLocal();
053: listeners = new LinkedList();
054: allTransactions = new LinkedList();
055: allTransactionIds = new HashSet();
056: runningListeners = new ThreadLocal();
057: }
058:
059: void registerTransactionId(Long serial) {
060: synchronized (allTransactions) {
061: allTransactionIds.add(serial);
062: }
063: }
064:
065: boolean hasOpenTransaction(Long serial) {
066: synchronized (allTransactions) {
067: return allTransactionIds.contains(serial);
068: }
069: }
070:
071: /**
072: * Mark all current transactions as rollback only.
073: */
074: public void markRollbackAll(StoreException ex) {
075: synchronized (allTransactions) {
076: for (int i = 0; i < allTransactions.size(); i++) {
077: Transaction tx = (Transaction) allTransactions.get(i);
078: tx.setRollbackException(ex);
079: }
080: }
081: }
082:
083: /**
084: * Get a transaction. Following modes are supported:
085: * <ul>
086: * <li>TX_REQUIRED: A new transaction is allocated if no current
087: * transaction exists, otherwise the current transaction is returned.</li>
088: * <li>TX_NEW: A new transaction is allocated either way.</li>
089: * <li>TX_OPTIONAL: If there is a current transaction, that is returned,
090: * null otherwise.</li>
091: * </ul>
092: * Note, that each transaction can support multiple levels of begin-commit
093: * blocks. Each transaction only commits/rollsback is the most outer
094: * block is commited/rolled back.
095: */
096: public Transaction getTransaction(int mode) {
097: LinkedList list = (LinkedList) transactions.get();
098: if (list == null) {
099: // No list yet, initialize threadlocal
100: list = new LinkedList();
101: transactions.set(list);
102: }
103: if ((list.size() == 0) && (mode == TX_OPTIONAL))
104: return null;
105: if ((list.size() == 0) || (mode == TX_NEW)) {
106: // No transaction, or new is explicitly required
107: Transaction transaction = new Transaction(this );
108: if (logger.isTraceEnabled())
109: logger
110: .trace("transaction allocation trace: "
111: + getStackTrace(transaction
112: .getAllocateTrace()));
113: transaction.setConnection(context.getDatabase()
114: .getConnectionSource().getConnection());
115: list.add(transaction);
116: synchronized (allTransactions) {
117: allTransactions.add(transaction);
118: }
119: }
120: return (Transaction) list.getLast();
121: }
122:
123: /**
124: * Send notifications.
125: * @param transaction The transaction which caused the event.
126: */
127: private void sendNotifications(Transaction transaction,
128: boolean commited) {
129: // Initialize running list if nescesssary. Running lists keep
130: // track of current threads listeners who are currently running.
131: // This is to prevent infinite recursions, inside a notification run
132: // a listener cannot cause any transacitions in which they will be
133: // called a second time.
134: HashSet runningList = (HashSet) runningListeners.get();
135: if (runningList == null) {
136: runningList = new HashSet();
137: runningListeners.set(runningList);
138: }
139: if (logger.isDebugEnabled())
140: logger.debug("sending notifications for " + transaction
141: + ", running listeners: " + runningList);
142: // Do notifications
143: Vector listenersCopy = null;
144: synchronized (listeners) {
145: listenersCopy = new Vector(listeners);
146: }
147: Iterator iterator = listenersCopy.iterator();
148: while (iterator.hasNext()) {
149: TransactionListener listener = (TransactionListener) iterator
150: .next();
151: if (!runningList.contains(listener)) {
152: runningList.add(listener); // Add to running list
153: try {
154: if (commited)
155: listener.transactionCommited(transaction); // Run listener
156: else
157: listener.transactionRolledback(transaction); // Run listener
158: } catch (Throwable e) {
159: logger.warn("a transaction listener threw error:",
160: e);
161: } finally {
162: runningList.remove(listener); // Remove from running list
163: }
164: }
165: }
166: }
167:
168: /**
169: * Internal commit, this is a callback from store.
170: */
171: void internalCommit(Transaction transaction) {
172: // First commit sql
173: Connection connection = transaction.getConnection();
174: try {
175: connection.commit();
176: try {
177: if (transaction.getTransactionIsolation() != Connection.TRANSACTION_READ_COMMITTED)
178: transaction
179: .setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
180: } catch (Exception e) {
181: logger
182: .warn(
183: "could not set transaction isolation back to READ_COMMITED.",
184: e);
185: }
186: } catch (Exception e) {
187: throw new StoreException("could not commit transaction", e);
188: } finally {
189: // Remove transaction from queue
190: removeTransaction(transaction);
191: }
192: // Send notifications
193: sendNotifications(transaction, true);
194: }
195:
196: /**
197: * Commit a transaction.
198: */
199: void commit(Transaction transaction) {
200: // If the transaction is internal, don't go through store
201: if ((transaction.getSerial() == null)
202: && (transaction.getSaveTables() != null)
203: && (transaction.getRemoveTables() != null)) {
204: internalCommit(transaction);
205: } else {
206: // Commit is going through Store
207: context.getStore().commit(transaction);
208: }
209: }
210:
211: void rollback(Transaction transaction) {
212: // If the transaction is internal, don't go through store
213: if ((transaction.getSerial() == null)
214: && (transaction.getSaveTables() != null)
215: && (transaction.getRemoveTables() != null)) {
216: internalRollback(transaction);
217: } else {
218: // Commit is going through Store
219: context.getStore().rollback(transaction);
220: }
221: }
222:
223: /**
224: * Rollback a transaction.
225: */
226: void internalRollback(Transaction transaction) {
227: // First rollback sql
228: Connection connection = transaction.getConnection();
229: try {
230: connection.rollback();
231: } catch (Exception e) {
232: throw new StoreException("could not roll back transaction",
233: e);
234: } finally {
235: // Remove transaction from queue
236: removeTransaction(transaction);
237: }
238: // Send notifications
239: sendNotifications(transaction, false);
240: }
241:
242: /**
243: * Remove transaction from queue. Also, if this transaction is not
244: * toplevel, then throw exception.
245: */
246: private void removeTransaction(Transaction transaction) {
247: // Remove from threadlocal list
248: LinkedList list = (LinkedList) transactions.get();
249: if ((list == null) || (list.size() == 0))
250: throw new StoreException(
251: "no transactions present, and tried to use one.");
252: if (!list.getLast().equals(transaction)) {
253: Transaction top = (Transaction) list.getLast();
254: logger
255: .warn("possible transaction leak, tried to commit/rollback transaction, which was not the currenct transaction, investigate! Top was: "
256: + top
257: + ", but tried to close: "
258: + transaction
259: + ". "
260: + "Last operation of top was: "
261: + top.getLastOperation()
262: + ", other's: "
263: + transaction.getLastOperation()
264: + ". "
265: + "Now follows the allocation trace of top and the transaction to be closed: \n"
266: + getStackTrace(top.getAllocateTrace())
267: + "\n"
268: + getStackTrace(transaction
269: .getAllocateTrace()));
270: throw new StoreException(
271: "tried to commit/rollback a transaction which is not the current transaction, possible transaction leak!");
272: }
273: // Give back connection
274: context.getDatabase().getConnectionSource().releaseConnection(
275: transaction.getConnection());
276: transaction.setConnection(null);
277: // Remove from list
278: list.removeLast();
279: // Remove from global list
280: synchronized (allTransactions) {
281: allTransactions.remove(transaction);
282: allTransactionIds.remove(transaction.getSerial());
283: }
284: }
285:
286: private String getStackTrace(Exception e) {
287: StringWriter trace = new StringWriter();
288: PrintWriter writer = new PrintWriter(trace);
289: e.printStackTrace(writer);
290: return trace.toString();
291: }
292:
293: /**
294: * Register a listener to this tracker.
295: */
296: public void addListener(TransactionListener listener) {
297: synchronized (listeners) {
298: if (!listeners.contains(listener))
299: listeners.add(listener);
300: }
301: }
302:
303: /**
304: * Remove a listener from this tracker.
305: */
306: public void removeListener(TransactionListener listener) {
307: synchronized (listeners) {
308: listeners.remove(listener);
309: }
310: }
311:
312: }
|