001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb.plugins.lock;
023:
024: import java.util.LinkedList;
025: import java.util.HashMap;
026: import java.util.ArrayList;
027:
028: import javax.transaction.Transaction;
029: import javax.transaction.Status;
030:
031: import org.jboss.invocation.Invocation;
032: import org.jboss.ejb.Container;
033: import org.jboss.ejb.EntityEnterpriseContext;
034: import org.jboss.monitor.LockMonitor;
035: import org.jboss.util.deadlock.DeadlockDetector;
036:
037: /**
038: * This class is holds threads awaiting the transactional lock to be free
039: * in a fair FIFO transactional queue. Non-transactional threads
040: * are also put in this wait queue as well. Unlike SimplePessimisticEJBLock which notifies all
041: * threads on transaction completion, this class pops the next waiting transaction from the queue
042: * and notifies only those threads waiting associated with that transaction. This
043: * class should perform better than Simple on high contention loads.
044: *
045: * Holds all locks for entity beans, not used for stateful. <p>
046: *
047: * All BeanLocks have a reference count.
048: * When the reference count goes to 0, the lock is released from the
049: * id -> lock mapping.
050: *
051: * As of 04/10/2002, you can now specify in jboss.xml method attributes that define
052: * methods as read-only. read-only methods(and read-only beans) will release transactional
053: * locks at the end of the invocation. This decreases likelyhood of deadlock and increases
054: * performance.
055: *
056: * FIXME marcf: we should get solid numbers on this locking, bench in multi-thread environments
057: * We need someone with serious SUN hardware to run this lock into the ground
058: *
059: * @author <a href="marc.fleury@jboss.org">Marc Fleury</a>
060: * @author <a href="bill@burkecentral.com">Bill Burke</a>
061: * @author <a href="pete@subx.com">Peter Murray</a>
062: *
063: * @version $Revision: 57209 $
064: */
065: public class QueuedPessimisticEJBLock extends BeanLockSupport {
066: private HashMap txLocks = new HashMap();
067: private LinkedList txWaitQueue = new LinkedList();
068:
069: private int txIdGen = 0;
070: protected LockMonitor lockMonitor = null;
071: /** A flag that disables the deadlock detection check */
072: protected boolean deadlockDetection = true;
073:
074: public void setContainer(Container container) {
075: this .container = container;
076: lockMonitor = container.getLockManager().getLockMonitor();
077: }
078:
079: public boolean getDeadlockDetection() {
080: return deadlockDetection;
081: }
082:
083: public void setDeadlockDetection(boolean flag) {
084: this .deadlockDetection = flag;
085: }
086:
087: private class TxLock {
088:
089: public Transaction waitingTx = null;
090: public int id = 0;
091: public String threadName;
092: public boolean isQueued;
093:
094: /**
095: * deadlocker is used by the DeadlockDetector
096: * It is the thread if the tx is null.
097: */
098: public Object deadlocker;
099:
100: public TxLock(Transaction trans) {
101: this .threadName = Thread.currentThread().toString();
102: this .waitingTx = trans;
103: if (trans == null) {
104: if (txIdGen < 0)
105: txIdGen = 0;
106: this .id = txIdGen++;
107: deadlocker = Thread.currentThread();
108: } else {
109: deadlocker = trans;
110: }
111: this .isQueued = true;
112: }
113:
114: public boolean equals(Object obj) {
115: if (obj == this )
116: return true;
117:
118: TxLock lock = (TxLock) obj;
119:
120: if (lock.waitingTx == null && this .waitingTx == null) {
121: return lock.id == this .id;
122: } else if (lock.waitingTx != null && this .waitingTx != null) {
123: return lock.waitingTx.equals(this .waitingTx);
124: }
125: return false;
126: }
127:
128: public int hashCode() {
129: return this .id;
130: }
131:
132: public String toString() {
133: StringBuffer buffer = new StringBuffer(100);
134: buffer.append("TXLOCK waitingTx=").append(waitingTx);
135: buffer.append(" id=").append(id);
136: buffer.append(" thread=").append(threadName);
137: buffer.append(" queued=").append(isQueued);
138: return buffer.toString();
139: }
140: }
141:
142: protected TxLock getTxLock(Transaction miTx) {
143: TxLock lock = null;
144: if (miTx == null) {
145: // There is no transaction
146: lock = new TxLock(null);
147: txWaitQueue.addLast(lock);
148: } else {
149: TxLock key = new TxLock(miTx);
150: lock = (TxLock) txLocks.get(key);
151: if (lock == null) {
152: txLocks.put(key, key);
153: txWaitQueue.addLast(key);
154: lock = key;
155: }
156: }
157: return lock;
158: }
159:
160: protected boolean isTxExpired(Transaction miTx) throws Exception {
161: if (miTx != null
162: && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
163: return true;
164: }
165: return false;
166: }
167:
168: public void schedule(Invocation mi) throws Exception {
169: boolean threadScheduled = false;
170: while (!threadScheduled) {
171: /* loop on lock wakeup and restart trying to schedule */
172: threadScheduled = doSchedule(mi);
173: }
174: }
175:
176: /**
177: * doSchedule(Invocation)
178: *
179: * doSchedule implements a particular policy for scheduling the threads coming in.
180: * There is always the spec required "serialization" but we can add custom scheduling in here
181: *
182: * Synchronizing on lock: a failure to get scheduled must result in a wait() call and a
183: * release of the lock. Schedulation must return with lock.
184: *
185: */
186: protected boolean doSchedule(Invocation mi) throws Exception {
187: boolean wasThreadScheduled = false;
188: Transaction miTx = mi.getTransaction();
189: boolean trace = log.isTraceEnabled();
190: this .sync();
191: try {
192: if (trace)
193: log.trace("Begin schedule, key=" + mi.getId());
194:
195: if (isTxExpired(miTx)) {
196: log.error("Saw rolled back tx=" + miTx);
197: throw new RuntimeException(
198: "Transaction marked for rollback, possibly a timeout");
199: }
200:
201: //Next test is independent of whether the context is locked or not, it is purely transactional
202: // Is the instance involved with another transaction? if so we implement pessimistic locking
203: long startWait = System.currentTimeMillis();
204: try {
205: wasThreadScheduled = waitForTx(miTx, trace);
206: if (wasThreadScheduled && lockMonitor != null) {
207: long endWait = System.currentTimeMillis()
208: - startWait;
209: lockMonitor.finishedContending(endWait);
210: }
211: } catch (Exception throwable) {
212: if (lockMonitor != null && isTxExpired(miTx)) {
213: lockMonitor.increaseTimeouts();
214: }
215: if (lockMonitor != null) {
216: long endWait = System.currentTimeMillis()
217: - startWait;
218: lockMonitor.finishedContending(endWait);
219: }
220: throw throwable;
221: }
222: } finally {
223: if (miTx == null // non-transactional
224: && wasThreadScheduled) {
225: // if this non-transctional thread was
226: // scheduled in txWaitQueue, we need to call nextTransaction
227: // Otherwise, threads in txWaitQueue will never wake up.
228: nextTransaction();
229: }
230: this .releaseSync();
231: }
232:
233: //If we reach here we are properly scheduled to go through so return true
234: return true;
235: }
236:
237: /**
238: * Wait until no other transaction is running with this lock.
239: *
240: * @return Returns true if this thread was scheduled in txWaitQueue
241: */
242: protected boolean waitForTx(Transaction miTx, boolean trace)
243: throws Exception {
244: boolean wasScheduled = false;
245: // Do we have a running transaction with the context?
246: // We loop here until either until success or until transaction timeout
247: // If we get out of the loop successfully, we can successfully
248: // set the transaction on this puppy.
249: TxLock txLock = null;
250: Object deadlocker = miTx;
251: if (deadlocker == null)
252: deadlocker = Thread.currentThread();
253:
254: while (getTransaction() != null &&
255: // And are we trying to enter with another transaction?
256: !getTransaction().equals(miTx)) {
257: // Check for a deadlock on every cycle
258: try {
259: if (deadlockDetection == true)
260: DeadlockDetector.singleton.deadlockDetection(
261: deadlocker, this );
262: } catch (Exception e) {
263: // We were queued, not any more
264: if (txLock != null && txLock.isQueued) {
265: txLocks.remove(txLock);
266: txWaitQueue.remove(txLock);
267: }
268: throw e;
269: }
270:
271: wasScheduled = true;
272: if (lockMonitor != null)
273: lockMonitor.contending();
274: // That's no good, only one transaction per context
275: // Let's put the thread to sleep the transaction demarcation will wake them up
276: if (trace)
277: log.trace("Transactional contention on context" + id);
278:
279: // Only queue the lock on the first iteration
280: if (txLock == null)
281: txLock = getTxLock(miTx);
282:
283: if (trace)
284: log.trace("Begin wait on Tx=" + getTransaction());
285:
286: // And lock the threads on the lock corresponding to the Tx in MI
287: synchronized (txLock) {
288: releaseSync();
289: try {
290: txLock.wait(txTimeout);
291: } catch (InterruptedException ignored) {
292: }
293: } // end synchronized(txLock)
294:
295: this .sync();
296:
297: if (trace)
298: log.trace("End wait on TxLock=" + getTransaction());
299: if (isTxExpired(miTx)) {
300: log.error(Thread.currentThread()
301: + "Saw rolled back tx=" + miTx
302: + " waiting for txLock"
303: // +" On method: " + mi.getMethod().getName()
304: // +" txWaitQueue size: " + txWaitQueue.size()
305: );
306: if (txLock.isQueued) {
307: // Remove the TxLock from the queue because this thread is exiting.
308: // Don't worry about notifying other threads that share the same transaction.
309: // They will timeout and throw the below RuntimeException
310: txLocks.remove(txLock);
311: txWaitQueue.remove(txLock);
312: } else if (getTransaction() != null
313: && getTransaction().equals(miTx)) {
314: // We're not qu
315: nextTransaction();
316: }
317: if (miTx != null) {
318: if (deadlockDetection == true)
319: DeadlockDetector.singleton
320: .removeWaiting(deadlocker);
321: }
322: throw new RuntimeException(
323: "Transaction marked for rollback, possibly a timeout");
324: }
325: } // end while(tx!=miTx)
326:
327: // If we get here, this means that we have the txlock
328: if (!wasScheduled) {
329: setTransaction(miTx);
330: }
331: return wasScheduled;
332: }
333:
334: /*
335: * nextTransaction()
336: *
337: * nextTransaction will
338: * - set the current tx to null
339: * - schedule the next transaction by notifying all threads waiting on the transaction
340: * - setting the thread with the new transaction so there is no race with incoming calls
341: */
342: protected void nextTransaction() {
343: if (synched == null) {
344: throw new IllegalStateException(
345: "do not call nextTransaction while not synched!");
346: }
347:
348: setTransaction(null);
349: // is there a waiting list?
350: if (!txWaitQueue.isEmpty()) {
351: TxLock thelock = (TxLock) txWaitQueue.removeFirst();
352: txLocks.remove(thelock);
353: thelock.isQueued = false;
354: // The new transaction is the next one, important to set it up to avoid race with
355: // new incoming calls
356: setTransaction(thelock.waitingTx);
357: // log.debug(Thread.currentThread()+" handing off to "+lock.threadName);
358: if (deadlockDetection == true)
359: DeadlockDetector.singleton
360: .removeWaiting(thelock.deadlocker);
361:
362: synchronized (thelock) {
363: // notify All threads waiting on this transaction.
364: // They will enter the methodLock wait loop.
365: thelock.notifyAll();
366: }
367: } else {
368: // log.debug(Thread.currentThread()+" handing off to empty queue");
369: }
370: }
371:
372: public void endTransaction(Transaction transaction) {
373: nextTransaction();
374: }
375:
376: public void wontSynchronize(Transaction trasaction) {
377: nextTransaction();
378: }
379:
380: /**
381: * releaseMethodLock
382: *
383: * if we reach the count of zero it means the instance is free from threads (and reentrency)
384: * we wake up the next thread in the currentLock
385: */
386: public void endInvocation(Invocation mi) {
387: // Do we own the lock?
388: Transaction tx = mi.getTransaction();
389: if (tx != null && tx.equals(getTransaction())) {
390: // If there is no context or synchronization, release the lock
391: EntityEnterpriseContext ctx = (EntityEnterpriseContext) mi
392: .getEnterpriseContext();
393: if (ctx == null || ctx.hasTxSynchronization() == false)
394: endTransaction(tx);
395: }
396: }
397:
398: public void removeRef() {
399: refs--;
400: if (refs == 0 && txWaitQueue.size() > 0) {
401: log.error("removing bean lock and it has tx's in QUEUE! "
402: + toString());
403: throw new IllegalStateException(
404: "removing bean lock and it has tx's in QUEUE!");
405: } else if (refs == 0 && getTransaction() != null) {
406: log.error("removing bean lock and it has tx set! "
407: + toString());
408: throw new IllegalStateException(
409: "removing bean lock and it has tx set!");
410: } else if (refs < 0) {
411: log
412: .error("negative lock reference count should never happen !");
413: throw new IllegalStateException(
414: "negative lock reference count !");
415: }
416: }
417:
418: public String toString() {
419: StringBuffer buffer = new StringBuffer(100);
420: buffer.append(super .toString());
421: buffer.append(", bean=").append(
422: container.getBeanMetaData().getEjbName());
423: buffer.append(", id=").append(id);
424: buffer.append(", refs=").append(refs);
425: buffer.append(", tx=").append(getTransaction());
426: buffer.append(", synched=").append(synched);
427: buffer.append(", timeout=").append(txTimeout);
428: buffer.append(", queue=").append(new ArrayList(txWaitQueue));
429: return buffer.toString();
430: }
431: }
|