001: ///////////////////////////////////////////////////////////////////////////////
002: //
003: // Copyright (C) 2003-@year@ by Thomas M. Hazel, MyOODB (www.myoodb.org)
004: //
005: // All Rights Reserved
006: //
007: // This program is free software; you can redistribute it and/or modify
008: // it under the terms of the GNU General Public License and GNU Library
009: // General Public License as published by the Free Software Foundation;
010: // either version 2, or (at your option) any later version.
011: //
012: // This program is distributed in the hope that it will be useful,
013: // but WITHOUT ANY WARRANTY; without even the implied warranty of
014: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: // GNU General Public License and GNU Library General Public License
016: // for more details.
017: //
018: // You should have received a copy of the GNU General Public License
019: // and GNU Library General Public License along with this program; if
020: // not, write to the Free Software Foundation, 675 Mass Ave, Cambridge,
021: // MA 02139, USA.
022: //
023: ///////////////////////////////////////////////////////////////////////////////
024: package org.myoodb.core;
025:
026: import java.util.*;
027:
028: import org.myoodb.*;
029: import org.myoodb.exception.*;
030: import org.myoodb.core.command.*;
031:
032: public final class TransactionManager extends AbstractManager {
033: private static final org.myoodb.util.Logger LOGGER = org.myoodb.util.Logger
034: .getLogger(TransactionManager.class);
035:
036: protected HashMap m_txTable;
037: protected HashMap m_threadTable;
038:
039: public TransactionManager() {
040: m_txTable = new HashMap(32);
041: m_threadTable = new HashMap(32);
042: }
043:
044: protected boolean processCommand(AbstractTransaction tx,
045: AbstractCommand command, AbstractClient client)
046: throws Exception {
047: boolean result = true;
048:
049: Thread thread = Thread.currentThread();
050: if (thread instanceof TransactionThread) {
051: ((TransactionThread) thread).setTransaction(tx);
052: }
053:
054: try {
055: tx.setClient(client);
056: result = tx.processCommand(command);
057: } finally {
058: tx.setClient(null);
059: }
060:
061: return result;
062: }
063:
064: protected boolean prepareTransaction(AbstractTransaction tx,
065: AbstractCommand command) throws Exception {
066: if (tx == null) {
067: command.setResult(new TransactionException(
068: "Thread is not associated to a transaction(1)"));
069: return false;
070: } else if (tx.getStatus() != AbstractTransaction.STATUS_STARTED) {
071: command.setResult(new TransactionException(
072: "Transaction has improper status(1): "
073: + tx.getStatus()));
074: return false;
075: } else {
076: try {
077: tx.prepare();
078: return true;
079: } catch (Exception e) {
080: abortTransaction(tx, command);
081: command.setResult(new TransactionException(
082: "Caught during prepare: " + e, e));
083: return false;
084: }
085: }
086: }
087:
088: protected void commitTransaction(AbstractTransaction tx,
089: AbstractCommand command) throws Exception {
090: if (tx == null) {
091: command.setResult(new TransactionException(
092: "Thread is not associated to a transaction(2)"));
093: } else if (tx.getStatus() != AbstractTransaction.STATUS_PREPARED) {
094: command.setResult(new TransactionException(
095: "Transaction has improper status(2): "
096: + tx.getStatus()));
097: } else {
098: try {
099: tx.commit();
100: } catch (Exception e) {
101: command.setResult(e);
102:
103: throw e;
104: } finally {
105: notifyWaitingTransactions();
106: }
107: }
108: }
109:
110: protected void abortTransaction(AbstractTransaction tx,
111: AbstractCommand command) throws Exception {
112: // XXX: the 'command' parameter can be null on client disconnect!
113:
114: if (tx == null) {
115: if (command != null) {
116: command
117: .setResult(new TransactionException(
118: "Thread is not associated to a transaction(3)"));
119: }
120: } else if (tx.getStatus() > AbstractTransaction.STATUS_COMMITING) {
121: if (command != null) {
122: command.setResult(new TransactionException(
123: "Transaction has improper status(3): "
124: + tx.getStatus()));
125: }
126: } else {
127: try {
128: if (LOGGER.isDebugEnabled() == true) {
129: LOGGER.debug("Aborting AbstractTransaction: " + tx);
130: }
131:
132: tx.abort();
133: } catch (Exception e) {
134: if (command != null) {
135: command.setResult(e);
136: }
137:
138: throw e;
139: } finally {
140: notifyWaitingTransactions();
141: }
142: }
143: }
144:
145: public void startup() throws Exception {
146: }
147:
148: public void shutdown() throws Exception {
149: if (LOGGER.isInfoEnabled() == true) {
150: LOGGER.info("- There are " + m_txTable.size()
151: + " pending transactions");
152: }
153:
154: if (m_txTable.isEmpty() == false) {
155: if (LOGGER.isInfoEnabled() == true) {
156: LOGGER
157: .info(". Waiting 30 seconds for their completion.");
158: }
159:
160: java.util.Iterator iter = getTransactions().iterator();
161: while (iter.hasNext()) {
162: AbstractTransaction tx = (AbstractTransaction) iter
163: .next();
164:
165: if (LOGGER.isInfoEnabled() == true) {
166: LOGGER.info(" + " + tx);
167: }
168:
169: tx.stop();
170: }
171:
172: for (int msecs = 60000; (m_txTable.isEmpty() == false)
173: && (msecs != 0); msecs--) {
174: if (LOGGER.isInfoEnabled() == true) {
175: LOGGER.info(".");
176: }
177:
178: Thread.sleep(500);
179: }
180: }
181: }
182:
183: public AbstractLock newLock() {
184: return new Lock();
185: }
186:
187: public ArrayList getTransactions() {
188: ArrayList list = new ArrayList(m_txTable.size());
189:
190: synchronized (m_txTable) {
191: java.util.Iterator iter = m_txTable.values().iterator();
192: while (iter.hasNext()) {
193: AbstractTransaction tx = (AbstractTransaction) iter
194: .next();
195: list.add(tx);
196: }
197: }
198:
199: return list;
200: }
201:
202: public AbstractTransaction getTransaction(long transactionId) {
203: synchronized (m_txTable) {
204: return (AbstractTransaction) m_txTable.get(new Long(
205: transactionId));
206: }
207: }
208:
209: public AbstractTransaction getTransaction(Thread thread) {
210: AbstractTransaction result = null;
211:
212: synchronized (m_txTable) {
213: LinkedList txList = (LinkedList) m_threadTable.get(thread);
214: if ((txList != null) && (txList.size() != 0)) {
215: result = (AbstractTransaction) txList.getLast();
216: }
217: }
218:
219: return result;
220: }
221:
222: public LinkedList getTransactionList(Thread thread) {
223: return (LinkedList) m_threadTable.get(thread);
224: }
225:
226: public AbstractTransaction currentTransaction() {
227: return getTransaction(Thread.currentThread());
228: }
229:
230: public LinkedList currentTransactionList() {
231: return getTransactionList(Thread.currentThread());
232: }
233:
234: public AbstractTransaction createTransaction(User owner) {
235: Thread thread = Thread.currentThread();
236: AbstractTransaction tx = MyOodbManager.getTheManager()
237: .getStoreManager().createTransaction(owner);
238:
239: synchronized (m_txTable) {
240: LinkedList txList = (LinkedList) m_threadTable.get(thread);
241: if (txList == null) {
242: txList = new LinkedList();
243: m_threadTable.put(thread, txList);
244: } else {
245: tx.setParentTransaction((AbstractTransaction) txList
246: .getLast());
247: }
248:
249: txList.add(tx);
250: m_txTable.put(new Long(tx.getTransactionIdentifier()), tx);
251: }
252:
253: return tx;
254: }
255:
256: public void deleteTransaction(Thread thread, AbstractTransaction tx) {
257: synchronized (m_txTable) {
258: tx.stop();
259: m_txTable.remove(new Long(tx.getTransactionIdentifier()));
260:
261: LinkedList txList = (LinkedList) m_threadTable.get(thread);
262: if (txList != null) {
263: txList.remove(tx);
264:
265: if (txList.size() == 0) {
266: m_threadTable.remove(Thread.currentThread());
267: }
268: }
269: }
270: }
271:
272: public void notifyWaitingTransactions() {
273: java.util.Iterator iter = getTransactions().iterator();
274: while (iter.hasNext()) {
275: AbstractTransaction tx = (AbstractTransaction) iter.next();
276: synchronized (tx) {
277: tx.notifyAll();
278: }
279: }
280: }
281:
282: public void processCommand(AbstractCommand command, User user,
283: AbstractClient client) {
284: try {
285: if (command instanceof TransactionCommand) {
286: switch (((TransactionCommand) command).getRequest()) {
287: case TransactionCommand.REQUEST_BEGIN: {
288: AbstractTransaction tx = createTransaction(user);
289: tx
290: .setTransactionType(AbstractTransaction.EXPLICIT_TRANSACTION);
291: command.setResult(new Long(tx
292: .getTransactionIdentifier()));
293: break;
294: }
295: case TransactionCommand.REQUEST_COMMIT: {
296: AbstractTransaction tx = currentTransaction();
297: if (tx.getMaxLockLevel() > org.myoodb.MyOodbAccess.READ) {
298: if (prepareTransaction(tx, command)) {
299: commitTransaction(tx, command);
300: }
301: }
302:
303: deleteTransaction(Thread.currentThread(), tx);
304: break;
305: }
306: case TransactionCommand.REQUEST_ROLLBACK: {
307: AbstractTransaction tx = currentTransaction();
308: abortTransaction(tx, command);
309: deleteTransaction(Thread.currentThread(), tx);
310: break;
311: }
312: case TransactionCommand.REQUEST_STATUS: {
313: command.setResult(new Integer(currentTransaction()
314: .getStatus()));
315: break;
316: }
317: }
318: } else if (command instanceof LogoutCommand) {
319: // Close this threads transactions
320: LinkedList txList = getTransactionList(client
321: .getThread());
322: if (txList != null) {
323: // remove last created transaction first (list gets changed by deleteTransaction)
324: LinkedList tmpList = (LinkedList) txList.clone();
325: while (tmpList.size() != 0) {
326: AbstractTransaction tx = (AbstractTransaction) tmpList
327: .removeLast();
328: if (tx.getOwner().equals(user) == false) {
329: LOGGER
330: .error("Ignore AbstractTransaction (not owned): "
331: + tx);
332: continue;
333: } else if (tx.getStatus() <= AbstractTransaction.STATUS_COMMITING) {
334: abortTransaction(tx, command);
335: } else {
336: if (LOGGER.isInfoEnabled() == true) {
337: LOGGER
338: .info("Ignoring AbstractTransaction (committed/aborted): "
339: + tx);
340: }
341: }
342:
343: deleteTransaction(client.getThread(), tx);
344: }
345: }
346:
347: client.getThread().interrupt();
348: } else {
349: AbstractTransaction tx = currentTransaction();
350: if (tx == null) {
351: tx = createTransaction(user);
352: if (processCommand(tx, command, client)) {
353: if (tx.getMaxLockLevel() > org.myoodb.MyOodbAccess.READ) {
354: if (prepareTransaction(tx, command)) {
355: commitTransaction(tx, command);
356: }
357: }
358: }
359:
360: deleteTransaction(Thread.currentThread(), tx);
361: } else {
362: processCommand(tx, command, client);
363: }
364: }
365: } catch (Throwable e) {
366: AbstractTransaction tx = currentTransaction();
367:
368: if (tx != null) {
369: try {
370: abortTransaction(tx, command);
371: } catch (Exception ae) {
372: LOGGER.error(ae);
373: }
374:
375: try {
376: deleteTransaction(Thread.currentThread(), tx);
377: } catch (Exception de) {
378: LOGGER.error(de);
379: }
380: }
381:
382: if (e instanceof InternalException) {
383: if (command == null) {
384: throw (InternalException) e;
385: } else {
386: command.setResult(e);
387: }
388: } else {
389: if (command == null) {
390: throw new InternalException(
391: "Caught during process work: " + e, e);
392: } else {
393: command.setResult(new InternalException(
394: "Caught during process work: " + e, e));
395: }
396: }
397: }
398: }
399: }
|