0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0004: * Copyright (C) 2005-2006 Continuent, Inc.
0005: * Contact: sequoia@continuent.org
0006: *
0007: * Licensed under the Apache License, Version 2.0 (the "License");
0008: * you may not use this file except in compliance with the License.
0009: * You may obtain a copy of the License at
0010: *
0011: * http://www.apache.org/licenses/LICENSE-2.0
0012: *
0013: * Unless required by applicable law or agreed to in writing, software
0014: * distributed under the License is distributed on an "AS IS" BASIS,
0015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0016: * See the License for the specific language governing permissions and
0017: * limitations under the License.
0018: *
0019: * Initial developer(s): Emmanuel Cecchet.
0020: * Contributor(s): ______________________.
0021: */package org.continuent.sequoia.controller.loadbalancer;
0022:
0023: import java.sql.SQLException;
0024: import java.sql.Statement;
0025: import java.util.ArrayList;
0026: import java.util.ConcurrentModificationException;
0027: import java.util.Iterator;
0028: import java.util.LinkedList;
0029: import java.util.List;
0030: import java.util.SortedSet;
0031:
0032: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0033: import org.continuent.sequoia.common.locks.DeadlockDetectionThread;
0034: import org.continuent.sequoia.common.locks.TransactionLogicalLock;
0035: import org.continuent.sequoia.common.log.Trace;
0036: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0037: import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0038: import org.continuent.sequoia.common.sql.schema.DatabaseTable;
0039: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0040: import org.continuent.sequoia.controller.connection.PooledConnection;
0041: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0042: import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0043: import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
0044: import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
0045: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0046: import org.continuent.sequoia.controller.requestmanager.RequestManager;
0047: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0048: import org.continuent.sequoia.controller.requests.AbstractRequest;
0049: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0050: import org.continuent.sequoia.controller.requests.CreateRequest;
0051: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0052: import org.continuent.sequoia.controller.requests.SelectRequest;
0053: import org.continuent.sequoia.controller.requests.StoredProcedure;
0054: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0055:
0056: /**
0057: * This class defines task queues that stores the requests to be executed on a
0058: * database backend.
0059: *
0060: * @author <a href="mailto:emmanuel.cecchet@emicnetworks.com">Emmanuel Cecchet</a>
0061: * @version 1.0
0062: */
0063: public class BackendTaskQueues {
0064: /** Queue in which queries arrive in total order */
0065: private LinkedList totalOrderQueue;
0066: /**
0067: * Queue for stored procedures without semantic information (locking the whole
0068: * database)
0069: */
0070: private LinkedList storedProcedureQueue;
0071: /**
0072: * Queue for conflicting requests (only first request of the queue can be
0073: * executed)
0074: */
0075: private LinkedList conflictingRequestsQueue;
0076: /**
0077: * Queue for non-conflicting requests that can be executed in parallel, in any
0078: * order.
0079: */
0080: private LinkedList nonConflictingRequestsQueue;
0081: /** Backend these queues are attached to */
0082: private DatabaseBackend backend;
0083: private WaitForCompletionPolicy waitForCompletionPolicy;
0084: private RequestManager requestManager;
0085: private boolean allowTasksToBePosted;
0086: private final Object ALLOW_TASKS_SYNC = new Object();
0087:
0088: private DeadlockDetectionThread deadlockDetectionThread;
0089:
0090: // Number of stored procedures that have been posted in the queue and that
0091: // have not completed yet
0092: private int storedProcedureInQueue = 0;
0093:
0094: private int writesWithMultipleLocks = 0;
0095: private Trace logger;
0096:
0097: /**
0098: * Creates a new <code>BackendTaskQueues</code> object
0099: *
0100: * @param backend DatabaseBackend associated to these queues
0101: * @param waitForCompletionPolicy the load balancer wait for completion policy
0102: * @param requestManager the request manager associated with these queues
0103: */
0104: public BackendTaskQueues(DatabaseBackend backend,
0105: WaitForCompletionPolicy waitForCompletionPolicy,
0106: RequestManager requestManager) {
0107: this .backend = backend;
0108: this .logger = backend.getLogger();
0109: this .waitForCompletionPolicy = waitForCompletionPolicy;
0110: this .requestManager = requestManager;
0111: totalOrderQueue = new LinkedList();
0112: storedProcedureQueue = new LinkedList();
0113: conflictingRequestsQueue = new LinkedList();
0114: nonConflictingRequestsQueue = new LinkedList();
0115: allowTasksToBePosted = false;
0116: }
0117:
0118: /**
0119: * Abort all queries belonging to the provided transaction.
0120: *
0121: * @param tid the transaction identifier
0122: * @return true if a rollback is already in progress
0123: */
0124: public boolean abortAllQueriesForTransaction(long tid) {
0125: synchronized (this ) {
0126: boolean rollbackInProgress = abortAllQueriesEvenRunningInTransaction(
0127: tid, storedProcedureQueue);
0128: if (abortAllQueriesEvenRunningInTransaction(tid,
0129: conflictingRequestsQueue))
0130: rollbackInProgress = true;
0131: if (abortAllQueriesEvenRunningInTransaction(tid,
0132: nonConflictingRequestsQueue))
0133: rollbackInProgress = true;
0134: return rollbackInProgress;
0135: }
0136: }
0137:
0138: /**
0139: * Abort all queries belonging to the given transaction even if they are
0140: * currently processed by a BackendWorkerThread.
0141: *
0142: * @param tid transaction identifier
0143: * @param queue queue to scan for queries to abort
0144: * @return true if a rollback is already in progress
0145: */
0146: private boolean abortAllQueriesEvenRunningInTransaction(long tid,
0147: LinkedList queue) {
0148: boolean rollbackInProgress = false;
0149: synchronized (queue) {
0150: Long lTid = new Long(tid);
0151: for (Iterator iter = queue.iterator(); iter.hasNext();) {
0152: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0153: .next();
0154: boolean isProcessing = false;
0155: AbstractTask task = entry.getTask();
0156: if (task.getTransactionId() == tid) {
0157: if (task instanceof RollbackTask)
0158: rollbackInProgress = true;
0159: else { /*
0160: * If we cancel a task in a transaction that was supposed to be
0161: * lazily started by that task, then we have to fake the transaction
0162: * start on this backend so that the transaction rollback and
0163: * subsequent check for priority inversion occurs on that backend.
0164: */
0165: if (!task.isAutoCommit()
0166: && !backend.isStartedTransaction(lTid))
0167: backend.startTransaction(lTid);
0168:
0169: if (logger.isDebugEnabled())
0170: logger.debug("Aborting request "
0171: + task.getRequest()
0172: + " on backend "
0173: + backend.getName());
0174:
0175: BackendWorkerThread processingThread = entry
0176: .getProcessingThread();
0177: if (processingThread != null) { // A thread is working on it, cancel the task
0178: isProcessing = true;
0179: Statement s = processingThread
0180: .getCurrentStatement();
0181: if (s != null) {
0182: try {
0183: s.cancel();
0184: } catch (SQLException e) {
0185: logger
0186: .warn(
0187: "Unable to cancel execution of request",
0188: e);
0189: } catch (NullPointerException e) {
0190: if (logger.isWarnEnabled())
0191: logger
0192: .warn(
0193: "Ignoring NullPointerException caused by Connector/J 5.0.4 bug #24721",
0194: e);
0195: }
0196: }
0197: }
0198: if (!task.hasCompleted()) {
0199: try {
0200: if (processingThread == null) { // abort has been called on a non-processed query, use a
0201: // random worker thread for notification
0202: processingThread = backend
0203: .getBackendWorkerThreadForNotification();
0204: if (processingThread == null) { // No worker thread left, should never happen.
0205: // Backend already disabled?
0206: logger
0207: .warn("No worker thread found for request abort notification, creating fake worker thread");
0208: processingThread = new BackendWorkerThread(
0209: backend,
0210: requestManager
0211: .getLoadBalancer());
0212: }
0213: }
0214: task
0215: .notifyFailure(
0216: processingThread,
0217: -1L,
0218: new SQLException(
0219: "Transaction aborted due to deadlock"));
0220: } catch (SQLException ignore) {
0221: }
0222: }
0223: if (!isProcessing) {
0224: /*
0225: * If the task was being processed by a thread, the completion
0226: * will be notified by the thread itself
0227: */
0228: completedEntryExecution(entry, iter);
0229: }
0230: }
0231: }
0232: }
0233: }
0234: return rollbackInProgress;
0235: }
0236:
0237: /**
0238: * Abort all requests remaining in the queues. This is usually called when the
0239: * backend is disabled and no backend worker thread should be processing any
0240: * request any more (this will generate a warning otherwise).
0241: */
0242: public void abortRemainingRequests() {
0243: setAllowTasksToBePosted(false);
0244: abortRemainingRequests(storedProcedureQueue);
0245: abortRemainingRequests(conflictingRequestsQueue);
0246: abortRemainingRequests(nonConflictingRequestsQueue);
0247: }
0248:
0249: /**
0250: * Abort the remaining request in the given queue
0251: *
0252: * @param queue the queue to purge
0253: */
0254: private void abortRemainingRequests(LinkedList queue) {
0255: synchronized (this ) {
0256: synchronized (queue) {
0257: for (Iterator iter = queue.iterator(); iter.hasNext();) {
0258: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0259: .next();
0260: AbstractTask task = entry.getTask();
0261:
0262: // Do not cancel KillThreadTasks
0263: if (task instanceof KillThreadTask)
0264: continue;
0265:
0266: if (entry.getProcessingThread() != null) { // A thread is working on it, warn and cancel the task
0267: logger
0268: .warn("A worker thread was still processing task "
0269: + task
0270: + ", aborting the request execution.");
0271: Statement s = entry.getProcessingThread()
0272: .getCurrentStatement();
0273: if (s != null) {
0274: try {
0275: s.cancel();
0276: } catch (SQLException e) {
0277: logger
0278: .warn(
0279: "Unable to cancel execution of request",
0280: e);
0281: }
0282: }
0283: }
0284: if (!task.hasCompleted()) {
0285: if (logger.isDebugEnabled())
0286: logger.debug("Cancelling task " + task);
0287: task.notifyCompletion(entry
0288: .getProcessingThread());
0289: }
0290: completedEntryExecution(entry, iter);
0291: }
0292: }
0293: }
0294: }
0295:
0296: /**
0297: * Add a task at the end of the backend total order queue
0298: *
0299: * @param task the task to add
0300: */
0301: public final void addTaskToBackendTotalOrderQueue(AbstractTask task) {
0302: synchronized (this ) {
0303: synchronized (totalOrderQueue) {
0304: totalOrderQueue.addLast(task);
0305: }
0306:
0307: /*
0308: * Wake up all worker threads in case we post multiple tasks before a
0309: * thread had time to take this task into account (this would result in a
0310: * lost notified event).
0311: */
0312: this .notifyAll();
0313: }
0314: }
0315:
0316: /**
0317: * Add a task at the end of the backend total order queue. Block as long as
0318: * the total order queue size if over the indicated queue size.
0319: *
0320: * @param task the task to add
0321: * @param queueSize the maximum queue size
0322: */
0323: public final void addTaskToBackendTotalOrderQueue(
0324: AbstractTask task, int queueSize) {
0325: synchronized (this ) {
0326: boolean mustNotify = false;
0327: do {
0328: synchronized (totalOrderQueue) {
0329: if (totalOrderQueue.size() < queueSize) {
0330: totalOrderQueue.addLast(task);
0331: mustNotify = true;
0332: }
0333: }
0334:
0335: if (mustNotify) {
0336: /*
0337: * Wake up all worker threads in case we post multiple tasks before a
0338: * thread had time to take this task into account (this would result
0339: * in a lost notified event).
0340: */
0341: this .notifyAll();
0342: return; // exit method here
0343: } else {
0344: try { // Wait for queue to free an entry
0345: this .wait();
0346: } catch (InterruptedException e) {
0347: }
0348: }
0349: } while (!mustNotify);
0350: }
0351: }
0352:
0353: /**
0354: * Add a task in the ConflictingRequestsQueue.
0355: *
0356: * @param task task to add
0357: */
0358: private void addTaskInConflictingRequestsQueue(AbstractTask task) {
0359: addTaskToQueue(conflictingRequestsQueue, task, false);
0360: }
0361:
0362: /**
0363: * Add a task in the NonConflictingRequestsQueue.
0364: *
0365: * @param task task to add
0366: * @param isACommitOrRollback true if the task is a commit or a rollback
0367: */
0368: private void addTaskInNonConflictingRequestsQueue(
0369: AbstractTask task, boolean isACommitOrRollback) {
0370: addTaskToQueue(nonConflictingRequestsQueue, task,
0371: isACommitOrRollback);
0372: }
0373:
0374: /**
0375: * Add a task in the StoredProcedureQueue.
0376: *
0377: * @param task task to add
0378: */
0379: private void addTaskInStoredProcedureQueue(AbstractTask task) {
0380: addTaskToQueue(storedProcedureQueue, task, false);
0381: }
0382:
0383: /**
0384: * Add the task in the given queue and notify the queue. Note that the task is
0385: * also added to the backend pending write request queue. addTaskToQueue
0386: *
0387: * @param queue queue in which the task must be added
0388: * @param task the task to add
0389: * @param isACommitOrRollback true if the task is a commit or a rollback
0390: */
0391: private void addTaskToQueue(LinkedList queue, AbstractTask task,
0392: boolean isACommitOrRollback) {
0393: if (!allowTasksToBePosted()) {
0394: if (logger.isDebugEnabled())
0395: logger.debug("Cancelling task " + task);
0396: task.notifyCompletion(null);
0397: return;
0398: }
0399:
0400: // We assume that all requests here are writes
0401: backend.addPendingTask(task);
0402: if (logger.isDebugEnabled())
0403: logger.debug("Adding task " + task
0404: + " to pending request queue");
0405:
0406: synchronized (this ) {
0407: // Add to the queue
0408: synchronized (queue) {
0409: queue.addLast(new BackendTaskQueueEntry(task, queue,
0410: isACommitOrRollback));
0411: }
0412:
0413: /*
0414: * Wake up all worker threads in case we post multiple tasks before a
0415: * thread had time to take this task into account (this would result in a
0416: * lost notified event).
0417: */
0418: this .notifyAll();
0419: }
0420: }
0421:
0422: /**
0423: * Check for priority inversion in the conflicting queue or possibly stored
0424: * procedure queue flushing if a transaction executing a stored procedure has
0425: * just completed. remove this entry and possibly re-arrange the queues
0426: */
0427: public final void checkForPriorityInversion() {
0428: DatabaseSchema schema = backend.getDatabaseSchema();
0429:
0430: // Let's check the conflicting queue for priority inversion
0431: synchronized (conflictingRequestsQueue) {
0432: for (Iterator iter = conflictingRequestsQueue.iterator(); iter
0433: .hasNext();) {
0434: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0435: .next();
0436:
0437: // If the entry is currently processed, don't try to move it else it
0438: // would be duplicated in the non-conflicting queue!!!
0439: if (entry.processingThread != null)
0440: continue;
0441:
0442: AbstractTask task = entry.getTask();
0443: if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) {
0444: if (task.getSuccess() + task.getFailed() > 0) { // Task has already been started by other nodes, just proceed with
0445: // it, we are late!
0446: if (logger.isDebugEnabled())
0447: logger
0448: .debug("Priority inversion for already started request "
0449: + task.getRequest());
0450: moveToNonConflictingQueue(iter, entry);
0451: continue;
0452: }
0453: }
0454:
0455: AbstractRequest request = task.getRequest();
0456: SortedSet lockedTables = request
0457: .getWriteLockedDatabaseTables();
0458: if (lockedTables != null) {
0459: boolean queryIsConflicting = false;
0460: for (Iterator iterator = lockedTables.iterator(); iterator
0461: .hasNext()
0462: && !queryIsConflicting;) {
0463: String tableName = (String) iterator.next();
0464: DatabaseTable table = schema.getTable(
0465: tableName, false);
0466: if (table == null) { // No table found, stay in the conflicting queue
0467: logger
0468: .warn("Unable to find table "
0469: + tableName
0470: + " in database schema, when checking priority inversion for query "
0471: + request
0472: .toStringShortForm(requestManager
0473: .getVirtualDatabase()
0474: .getSqlShortFormLength()));
0475: } else {
0476: /*
0477: * If the table we are conflicting with now belongs to us then we
0478: * can go in the non-conflicting queue. Note that it is not
0479: * possible for the lock to be free since we have acquired it
0480: * earlier and we are waiting for our turn.
0481: */
0482: TransactionLogicalLock lock = table
0483: .getLock();
0484: if (!lock.isLocked())
0485: logger
0486: .warn("Unexpected free lock on table "
0487: + table);
0488: else { // Check that the lock belong to our transaction
0489: queryIsConflicting = lock.getLocker() != task
0490: .getTransactionId();
0491: }
0492: }
0493: }
0494: if (!queryIsConflicting) { // Locks are now free, move to the non-conflicting queue
0495: // Do not try to take the lock again else it will not be released
0496: if (logger.isDebugEnabled())
0497: logger
0498: .debug("Priority inversion for request "
0499: + task.getRequest());
0500: moveToNonConflictingQueue(iter, entry);
0501: }
0502: } else { // Query does not lock anything, it should not have been posted in the
0503: // conflicting queue
0504: logger.warn("Non-locking task " + task
0505: + " was posted in conflicting queue");
0506: if (logger.isDebugEnabled())
0507: logger.debug("Priority inversion for request "
0508: + task.getRequest());
0509: moveToNonConflictingQueue(iter, entry);
0510: }
0511: }
0512: }
0513:
0514: // Look at the stored procedure queue
0515: synchronized (storedProcedureQueue) {
0516: for (Iterator iter = storedProcedureQueue.iterator(); iter
0517: .hasNext();) {
0518: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0519: .next();
0520:
0521: TransactionLogicalLock globalLock = schema.getLock();
0522: AbstractTask task = entry.getTask();
0523: AbstractRequest request = task.getRequest();
0524: if (globalLock.isLocked()) { // Stored procedure is executing
0525: if (task.getTransactionId() == globalLock
0526: .getLocker()) {
0527: // Just wait for current transactions to complete if all locks are
0528: // not free.
0529: if (!schema
0530: .allTablesAreUnlockedOrLockedByTransaction(request))
0531: return;
0532:
0533: /*
0534: * We belong to the transaction that executes the stored procedure
0535: * (or to the auto commit request that holds the lock), let's go in
0536: * the non-conflicting request queue.
0537: */
0538: moveToNonConflictingQueue(iter, entry);
0539: // if we are in auto commit, it means that we are the stored
0540: // procedure which has acquired the lock during the atomic post
0541: if (task.isAutoCommit())
0542: return;
0543: continue;
0544: } else { // Check if the stored procedure currently executing is not
0545: // somewhere in the stored procedure queue.
0546:
0547: boolean currentStoredProcedureInQueue = false;
0548: for (Iterator iter2 = storedProcedureQueue
0549: .iterator(); iter2.hasNext();) {
0550: BackendTaskQueueEntry entry2 = (BackendTaskQueueEntry) iter2
0551: .next();
0552: AbstractTask task2 = entry2.getTask();
0553: if ((task2 != null)
0554: && (task2.getTransactionId() == globalLock
0555: .getLocker()))
0556: currentStoredProcedureInQueue = true;
0557: }
0558:
0559: // If the stored procedure is not in the queue then it is currently
0560: // executing and we have to wait for its completion
0561: if (!currentStoredProcedureInQueue)
0562: return;
0563: }
0564: }
0565:
0566: // Schema is not locked, no stored procedure currently executes
0567: TransactionMetaData tm = getTransactionMetaData(request);
0568:
0569: if ((request instanceof SelectRequest)
0570: || (request instanceof AbstractWriteRequest)) {
0571: SortedSet writeLockedTables = request
0572: .getWriteLockedDatabaseTables();
0573:
0574: if (writeLockedTables == null
0575: || writeLockedTables.isEmpty()) { // This request does not lock anything
0576: moveToNonConflictingQueue(iter, entry);
0577: continue;
0578: }
0579:
0580: moveMultipleWriteLocksQuery(schema, iter, entry,
0581: task, request, tm);
0582: } else {
0583: if (request instanceof StoredProcedure) {
0584: StoredProcedure sp = (StoredProcedure) request;
0585: DatabaseProcedureSemantic semantic = sp
0586: .getSemantic();
0587: if (semantic != null) {
0588: // Try to optimize the stored procedure execution based on its
0589: // semantic information
0590: if (semantic.isCommutative()
0591: || semantic.isReadOnly()
0592: || (request
0593: .getWriteLockedDatabaseTables() == null))
0594: moveToNonConflictingQueue(iter, entry);
0595: else
0596: moveMultipleWriteLocksQuery(schema,
0597: iter, entry, task, request, tm);
0598: continue;
0599: }
0600: }
0601:
0602: // Stored procedure or unknown query, take the global lock and proceed
0603: // if all other locks are free.
0604:
0605: globalLock.acquire(request);
0606: if (tm != null) {
0607: List acquiredLocks = tm
0608: .getAcquiredLocks(backend);
0609: if ((acquiredLocks == null)
0610: || !acquiredLocks.contains(globalLock))
0611: tm.addAcquiredLock(backend, globalLock);
0612: } else {
0613: ArrayList globalLockList = new ArrayList();
0614: globalLockList.add(globalLock);
0615: task.setLocks(backend, globalLockList);
0616: }
0617:
0618: // Just wait for current transactions to complete if all locks are not
0619: // free.
0620: if (!schema
0621: .allTablesAreUnlockedOrLockedByTransaction(request))
0622: return;
0623:
0624: // Clear to go, all locks are free. Acquire the global lock and move
0625: // to the non-conflicting queue.
0626: moveToNonConflictingQueue(iter, entry);
0627: continue;
0628: }
0629: }
0630: }
0631: }
0632:
0633: private void moveMultipleWriteLocksQuery(DatabaseSchema schema,
0634: Iterator iter, BackendTaskQueueEntry entry,
0635: AbstractTask task, AbstractRequest request,
0636: TransactionMetaData tm) {
0637: /*
0638: * Assume that we will get all locks and that we will execute in the
0639: * non-conflicting queue. If there is any issue, the queue will be set to
0640: * conflicting queue.
0641: */
0642: boolean allLocksAcquired = true;
0643: for (Iterator lockIter = request.getWriteLockedDatabaseTables()
0644: .iterator(); lockIter.hasNext();) {
0645: String tableName = (String) lockIter.next();
0646: DatabaseTable table = schema.getTable(tableName, false);
0647: if (table == null) { // No table found, let's go for the conflicting queue
0648: logger.warn("Unable to find table "
0649: + tableName
0650: + " in database schema, scheduling query "
0651: + request.toStringShortForm(requestManager
0652: .getVirtualDatabase()
0653: .getSqlShortFormLength())
0654: + " in conflicting queue.");
0655: allLocksAcquired = false;
0656: } else { /*
0657: * If we get the lock we go in the non conflicting queue else we go in
0658: * the conflicting queue
0659: */
0660: TransactionLogicalLock tableLock = table.getLock();
0661: if (!tableLock.acquire(request))
0662: allLocksAcquired = false;
0663: /*
0664: * Make sure that the lock is added only once to the list especially if
0665: * multiple backends execute this piece of code when checking for
0666: * priority inversion in their own queue (if the lock was already
0667: * acquired, tableLock.acquire() returns directly true)
0668: */
0669: if (tm != null) {
0670: List acquiredLocks = tm.getAcquiredLocks(backend);
0671: if ((acquiredLocks == null)
0672: || !acquiredLocks.contains(tableLock))
0673: tm.addAcquiredLock(backend, tableLock);
0674: } else {
0675: List tableLockList = task.getLocks(backend);
0676: if (tableLockList == null)
0677: tableLockList = new ArrayList();
0678: // There is no need to synchronize on task.getLocks because we are in
0679: // mutual exclusion here in a synchronized block on synchronized
0680: // (atomicPostSyncObject) that has been taken by the caller of this
0681: // method.
0682: if (!tableLockList.contains(tableLock)) {
0683: tableLockList.add(tableLock);
0684: task.setLocks(backend, tableLockList);
0685: }
0686: }
0687: }
0688: }
0689: // if we acquired all locks, we can go to the non conflicting queue
0690: if (allLocksAcquired)
0691: moveToNonConflictingQueue(iter, entry);
0692: else
0693: moveToConflictingQueue(iter, entry);
0694: }
0695:
0696: private void moveToConflictingQueue(Iterator iter,
0697: BackendTaskQueueEntry entry) {
0698: iter.remove();
0699: if (logger.isDebugEnabled())
0700: logger.debug("Moving " + entry.getTask()
0701: + " to conflicting queue");
0702: synchronized (conflictingRequestsQueue) {
0703: entry.setQueue(conflictingRequestsQueue);
0704: conflictingRequestsQueue.addLast(entry);
0705: }
0706: }
0707:
0708: private void moveToNonConflictingQueue(Iterator iter,
0709: BackendTaskQueueEntry entry) {
0710: iter.remove();
0711: if (logger.isDebugEnabled())
0712: logger.debug("Moving " + entry.getTask()
0713: + " to non conflicting queue");
0714: synchronized (nonConflictingRequestsQueue) {
0715: entry.setQueue(nonConflictingRequestsQueue);
0716: nonConflictingRequestsQueue.addLast(entry);
0717: }
0718: }
0719:
0720: private static final int UNASSIGNED_QUEUE = -1;
0721: private static final int CONFLICTING_QUEUE = 0;
0722: private static final int NON_CONFLICTING_QUEUE = 1;
0723: private static final int STORED_PROCEDURE_QUEUE = 2;
0724: private final Object atomicPostSyncObject = new Object();
0725:
0726: /**
0727: * Lock list variable is set by getQueueAndWriteLockTables and retrieved by
0728: * atomicTaskPostInQueueAndReleaseLock. This is safe since this happens in the
0729: * synchronized (ATOMIC_POST_SYNC_OBJECT) block.
0730: */
0731: private ArrayList lockList = null;
0732:
0733: /**
0734: * Fetch the next task from the backend total order queue and post it to one
0735: * of the queues (conflicting or not).
0736: * <p>
0737: * Note that this method must be called within a synchronized block on this.
0738: *
0739: * @return true if an entry was processed, false if there is the total order
0740: * queue is empty.
0741: */
0742: private boolean fetchNextQueryFromBackendTotalOrderQueue() {
0743: DatabaseSchema schema = backend.getDatabaseSchema();
0744: TransactionMetaData tm = null;
0745: int queueToUse = UNASSIGNED_QUEUE;
0746:
0747: AbstractTask task;
0748: AbstractRequest request;
0749:
0750: // Fetch first task from queue
0751: synchronized (totalOrderQueue) {
0752: if (totalOrderQueue.isEmpty())
0753: return false;
0754: task = (AbstractTask) totalOrderQueue.removeFirst();
0755:
0756: if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) { /*
0757: * If asynchronous execution is allowed, we have to ensure that queries
0758: * of the same transaction are executed in order that is only one at a
0759: * time. We also have to ensure that late queries execute before new
0760: * queries accessing the same resources.
0761: */
0762:
0763: /*
0764: * SYNCHRONIZATION: this check has to be performed in a synchronized
0765: * block to avoid race conditions with terminating taks that perform
0766: * priority inversions. Such operations move tasks across the backend
0767: * queues and may invalidate the check.
0768: */
0769: synchronized (atomicPostSyncObject) {
0770: while (mustWaitForLateTask(task)) {
0771: totalOrderQueue.addFirst(task); // Put back request in queue
0772: // Behave as an empty queue, we will be notified when the blocking
0773: // query has completed
0774: return false;
0775: }
0776: }
0777: }
0778:
0779: // Now process the task
0780: request = task.getRequest();
0781: if (request == null || task instanceof BeginTask) {
0782: addTaskInNonConflictingRequestsQueue(task, !task
0783: .isAutoCommit());
0784: return true;
0785: } else { // Parse the request if needed, should only happen at recover time
0786: try {
0787: if (!request.isParsed())
0788: request.parse(backend.getDatabaseSchema(),
0789: ParsingGranularities.TABLE, false);
0790: } catch (SQLException e) {
0791: logger.warn("Parsing of request " + request
0792: + " failed in recovery process", e);
0793: }
0794: }
0795: if (backend.isReplaying()) {
0796: /*
0797: * Read-only stored procedures can get logged when the schema is
0798: * unavailable, because no backends are enabled. They do not need to be
0799: * replayed and they may slow down recovery significantly.
0800: */
0801: if (request instanceof StoredProcedure) {
0802: StoredProcedure sp = (StoredProcedure) request;
0803: DatabaseProcedureSemantic semantic = sp
0804: .getSemantic();
0805: if (semantic != null && semantic.isReadOnly()) {
0806: task.notifySuccess(null);
0807: synchronized (this ) {// The RecoverThread may be waiting to add a request
0808: notifyAll();
0809: }
0810: return true;
0811: }
0812: }
0813: }
0814: if (!request.isAutoCommit()) { // Retrieve the transaction marker metadata
0815: try {
0816: tm = requestManager
0817: .getTransactionMetaData(new Long(request
0818: .getTransactionId()));
0819: } catch (SQLException e) {
0820: // We didn't start or lazy start the transaction
0821: if (logger.isDebugEnabled())
0822: logger
0823: .debug("No transaction medatada found for transaction "
0824: + request.getTransactionId());
0825: }
0826: }
0827:
0828: if (schema == null) {
0829: try {
0830: task.notifyFailure((BackendWorkerThread) Thread
0831: .currentThread(), 0, new SQLException(
0832: "No schema available to perform request locking on backend "
0833: + backend.getName()));
0834: } catch (SQLException ignore) {
0835: // Wait interrupted in notifyFailure
0836: }
0837: return true;
0838: }
0839:
0840: synchronized (atomicPostSyncObject) {
0841: lockList = null;
0842:
0843: boolean requestIsAStoredProcedure = request instanceof StoredProcedure;
0844: if (requestIsAStoredProcedure)
0845: storedProcedureInQueue++;
0846:
0847: SortedSet writeLockedTables = request
0848: .getWriteLockedDatabaseTables();
0849: if ((writeLockedTables != null)
0850: && (writeLockedTables.size() > 1))
0851: writesWithMultipleLocks++;
0852:
0853: // Check if a stored procedure is locking the database
0854: TransactionLogicalLock globalLock = schema.getLock();
0855: if (globalLock.isLocked()) {
0856: if (request.isAutoCommit()) {
0857: queueToUse = STORED_PROCEDURE_QUEUE;
0858: } else {
0859: /*
0860: * If we are the transaction executing the stored procedure, then we
0861: * can proceed in the conflicting queue.
0862: */
0863: if (globalLock.getLocker() == request
0864: .getTransactionId())
0865: queueToUse = NON_CONFLICTING_QUEUE;
0866: else {
0867: /*
0868: * If we are one of the transactions that already has acquired
0869: * locks then we should try to complete our transaction else we
0870: * stack in the stored procedure queue.
0871: */
0872: if ((tm == null)
0873: || (tm.getAcquiredLocks(backend) == null)) {
0874: // No locks taken so far, or transaction not [lazy] started =>
0875: // go in the stored procedure queue
0876: queueToUse = STORED_PROCEDURE_QUEUE;
0877: }
0878: }
0879: }
0880: }
0881:
0882: if (queueToUse == UNASSIGNED_QUEUE) { // No stored procedure or transaction that started before the stored
0883: // procedure was posted
0884: if (request instanceof AbstractWriteRequest
0885: && !((AbstractWriteRequest) request)
0886: .requiresGlobalLock()) {
0887: try {
0888: queueToUse = getQueueAndWriteLockTables(
0889: request, schema, tm);
0890: } catch (SQLException e) {
0891: try {
0892: task.notifyFailure(
0893: (BackendWorkerThread) Thread
0894: .currentThread(), 0, e);
0895: } catch (SQLException ignore) {
0896: // Wait interrupted in notifyFailure
0897: }
0898: return true;
0899: }
0900: } else if (request instanceof SelectRequest) {
0901: /*
0902: * Note that SelectRequest scheduling is a little bit tricky to
0903: * understand. Basically, we should just allow one select request at
0904: * a time. If they are in different transactions, this is fine they
0905: * will be properly isolated by the underlying database and queries
0906: * from the same transaction are guaranteed to be executed in order
0907: * (therefore they will go to the non-conflicting queue since their
0908: * write lock set is null). If SELECT is in autocommit, we ensure
0909: * that only one autocommit request is executed at a time, so
0910: * finally we are safe in all cases. SELECT...FOR UPDATE are treated
0911: * as writes since their write lock tables is set accordingly.
0912: */
0913: try {
0914: queueToUse = getQueueAndWriteLockTables(
0915: request, schema, tm);
0916: } catch (SQLException e) {
0917: try {
0918: task.notifyFailure(
0919: (BackendWorkerThread) Thread
0920: .currentThread(), 0, e);
0921: } catch (SQLException ignore) {
0922: // Wait interrupted in notifyFailure
0923: }
0924: return true;
0925: }
0926: } else {
0927: if (requestIsAStoredProcedure) {
0928: StoredProcedure sp = (StoredProcedure) request;
0929: DatabaseProcedureSemantic semantic = sp
0930: .getSemantic();
0931: if (semantic != null) {
0932: // Try to optimize the stored procedure execution based on its
0933: // semantic information
0934: if (semantic.isReadOnly()
0935: || (request
0936: .getWriteLockedDatabaseTables() == null))
0937: queueToUse = NON_CONFLICTING_QUEUE;
0938: else {
0939: try {
0940: queueToUse = getQueueAndWriteLockTables(
0941: request, schema, tm);
0942: } catch (SQLException e) {
0943: try {
0944: task
0945: .notifyFailure(
0946: (BackendWorkerThread) Thread
0947: .currentThread(),
0948: 0, e);
0949: } catch (SQLException ignore) {
0950: // Wait interrupted in notifyFailure
0951: }
0952: return true;
0953: }
0954: if (semantic.isCommutative())
0955: queueToUse = NON_CONFLICTING_QUEUE;
0956: }
0957: }
0958: }
0959:
0960: if (queueToUse == UNASSIGNED_QUEUE) {
0961: /*
0962: * Stored procedure or unknown query, let's assume it blocks the
0963: * whole database. Check if we can lock everything else we wait
0964: * for all locks to be free.
0965: */
0966: if (!globalLock.isLocked()) { // Lock the whole database so that we can execute when all
0967: // locks are released
0968: globalLock.acquire(request);
0969: if (tm != null)
0970: tm.addAcquiredLock(backend,
0971: globalLock);
0972: else {
0973: if (lockList == null)
0974: lockList = new ArrayList();
0975: lockList.add(globalLock);
0976: }
0977: if (schema
0978: .allTablesAreUnlockedOrLockedByTransaction(request))
0979: // Clear to go, all locks are free
0980: queueToUse = NON_CONFLICTING_QUEUE;
0981: else
0982: // We will have to wait for everyone to release its locks
0983: queueToUse = STORED_PROCEDURE_QUEUE;
0984: } else { /*
0985: * A stored procedure is holding the lock but we are in a
0986: * transaction that already acquired locks so we are authorized
0987: * to complete.
0988: */
0989: if (schema
0990: .allTablesAreUnlockedOrLockedByTransaction(request)) {
0991: queueToUse = NON_CONFLICTING_QUEUE;
0992: List locks = schema
0993: .lockAllTables(request);
0994: if (tm != null)
0995: tm.addAcquiredLocks(backend,
0996: locks);
0997: else {
0998: if (lockList == null)
0999: lockList = new ArrayList();
1000: lockList.add(locks);
1001: }
1002: } else { /*
1003: * We will have to wait for the completion of the transaction
1004: * of the stored procedure currently holding the global lock.
1005: */
1006: queueToUse = STORED_PROCEDURE_QUEUE;
1007: }
1008: }
1009: }
1010: }
1011: }
1012:
1013: if (queueToUse == NON_CONFLICTING_QUEUE) {
1014: if (logger.isDebugEnabled())
1015: logger.debug("Scheduling request " + request
1016: + " in non conflicting queue");
1017: addTaskInNonConflictingRequestsQueue(task, false);
1018: } else if (queueToUse == CONFLICTING_QUEUE) {
1019: if (logger.isDebugEnabled())
1020: logger.debug("Scheduling request " + request
1021: + " in conflicting queue");
1022: addTaskInConflictingRequestsQueue(task);
1023: } else if (queueToUse == STORED_PROCEDURE_QUEUE) {
1024: if (logger.isDebugEnabled())
1025: logger.debug("Scheduling request " + request
1026: + " in stored procedure queue");
1027: addTaskInStoredProcedureQueue(task);
1028: }
1029:
1030: task.setLocks(backend, lockList);
1031: } // synchronized (atomicPostSyncObject)
1032: } // synchronized (totalOrderQueue)
1033:
1034: return true;
1035: }
1036:
1037: /**
1038: * Schedule a query that takes write on multiple tables and tell in which
1039: * queue the task should be posted. This updates the conflictingTable above if
1040: * the query must be posted in the CONFLICTING_QUEUE and this always update
1041: * the list of locks taken by this request (either directly in tm lock list if
1042: * tm is not null, or by updating lockList defined above)
1043: *
1044: * @param request the request to schedule
1045: * @param schema the current database schema containing lock information
1046: * @param tm the transaction marker metadata (null if request is autocommit)
1047: * @return the queue to use (NON_CONFLICTING_QUEUE or CONFLICTING_QUEUE)
1048: * @throws SQLException if a table is not found in the schema and
1049: * enforceTableExistenceIntoSchema is set to true for the VDB
1050: */
1051: private int getQueueAndWriteLockTables(AbstractRequest request,
1052: DatabaseSchema schema, TransactionMetaData tm)
1053: throws SQLException {
1054: SortedSet writeLockedTables = request
1055: .getWriteLockedDatabaseTables();
1056:
1057: if (writeLockedTables == null || writeLockedTables.isEmpty()) { // This request does not lock anything
1058: return NON_CONFLICTING_QUEUE;
1059: } else if (request.isCreate() && writeLockedTables.size() == 1) { // This request does not lock anything
1060: // create table : we do not need to execute
1061: // in conflicting queue, but we have to lock the table for recovery
1062: // operations (that are done in a parallel way)
1063: return NON_CONFLICTING_QUEUE;
1064: }
1065:
1066: /*
1067: * Assume that we will get all locks and that we will execute in the
1068: * non-conflicting queue. If there is any issue, the queue will be set to
1069: * conflicting queue.
1070: */
1071: int queueToUse = NON_CONFLICTING_QUEUE;
1072: for (Iterator iter = writeLockedTables.iterator(); iter
1073: .hasNext();) {
1074: String tableName = (String) iter.next();
1075: DatabaseTable table = schema.getTable(tableName, false);
1076: if (table == null) { // table not found in the database schema.
1077: if (request.isCreate()
1078: && tableName.equals(((CreateRequest) request)
1079: .getTableName())) {
1080: // We are trying to create a table, so it can obiously not be found in
1081: // the database schema. Let's go for the conflicting queue.
1082: logger.warn("Creating table "
1083: + tableName
1084: + ", scheduling query "
1085: + request.toStringShortForm(requestManager
1086: .getVirtualDatabase()
1087: .getSqlShortFormLength())
1088: + " in conflicting queue.");
1089: queueToUse = CONFLICTING_QUEUE;
1090: continue;
1091: }
1092:
1093: // Check if it is a session-dependant temporary table that could not be
1094: // found in the database schema
1095: if (!request.isAutoCommit()) {
1096: PooledConnection pc = backend.getConnectionManager(
1097: request.getLogin())
1098: .retrieveConnectionForTransaction(
1099: request.getTransactionId());
1100: if (pc != null
1101: && pc.existsTemporaryTable(tableName))
1102: continue;
1103: } else if (request.isPersistentConnection()) {
1104: try {
1105: PooledConnection pc = backend
1106: .getConnectionManager(
1107: request.getLogin())
1108: .retrieveConnectionInAutoCommit(request);
1109: if (pc != null
1110: && pc.existsTemporaryTable(tableName))
1111: continue;
1112: } catch (UnreachableBackendException e) {
1113: }
1114: }
1115:
1116: // At this point, we did not find the table either in the database
1117: // schema, nor inside the connection's context.
1118: if (!request.tableExistenceCheckIsDisabled()
1119: && requestManager.getVirtualDatabase()
1120: .enforceTableExistenceIntoSchema()) {
1121: String errMsg = "Unable to find table "
1122: + tableName
1123: + " in database schema, rejecting query "
1124: + request.toStringShortForm(requestManager
1125: .getVirtualDatabase()
1126: .getSqlShortFormLength()) + ".";
1127: logger.warn(errMsg);
1128: throw new SQLException(errMsg);
1129: } else {
1130: // No table found, let's go for the conflicting queue
1131: logger.warn("Unable to find table "
1132: + tableName
1133: + " in database schema, scheduling query "
1134: + request.toStringShortForm(requestManager
1135: .getVirtualDatabase()
1136: .getSqlShortFormLength())
1137: + " in conflicting queue.");
1138: queueToUse = CONFLICTING_QUEUE;
1139: }
1140: } else { /*
1141: * If we get the lock we go in the non conflicting queue else we go in
1142: * the conflicting queue
1143: */
1144: if (!table.getLock().acquire(request)) {
1145: queueToUse = CONFLICTING_QUEUE;
1146: if (logger.isDebugEnabled())
1147: logger.debug("Request " + request
1148: + " waits for lock on table " + table);
1149: }
1150: if (tm != null)
1151: tm.addAcquiredLock(backend, table.getLock());
1152: else {
1153: if (lockList == null)
1154: lockList = new ArrayList();
1155: lockList.add(table.getLock());
1156: }
1157: }
1158: }
1159: return queueToUse;
1160: }
1161:
1162: /**
1163: * Release the locks acquired by a request executed in autocommit mode.
1164: *
1165: * @param locks the list of locks acquired by the request
1166: * @param transactionId the "fake" transaction id assign to the autocommit
1167: * request releasing the locks
1168: */
1169: private void releaseLocksForAutoCommitRequest(List locks,
1170: long transactionId) {
1171: if (locks == null)
1172: return; // No locks acquired
1173: for (Iterator iter = locks.iterator(); iter.hasNext();) {
1174: TransactionLogicalLock lock = (TransactionLogicalLock) iter
1175: .next();
1176: if (lock == null)
1177: logger.warn("Unexpected null lock for transaction "
1178: + transactionId + " when releasing "
1179: + locks.toArray());
1180: else
1181: lock.release(transactionId);
1182: }
1183: }
1184:
1185: /**
1186: * Release the locks held by the given transaction at commit/rollback time.
1187: *
1188: * @param transactionId the transaction releasing the locks
1189: */
1190: private void releaseLocksForTransaction(long transactionId) {
1191: try {
1192: TransactionMetaData tm = requestManager
1193: .getTransactionMetaData(new Long(transactionId));
1194: releaseLocksForAutoCommitRequest(tm
1195: .removeBackendLocks(backend), transactionId);
1196: } catch (SQLException e) {
1197: /*
1198: * this is expected to fail when replaying the recovery log, since the
1199: * request manager won't have any transaction metadatas for transactions
1200: * we are replaying => we don't log warnings in this case.
1201: */
1202: if (!backend.isReplaying())
1203: if (logger.isWarnEnabled())
1204: logger
1205: .warn("No transaction medatada found for transaction "
1206: + transactionId
1207: + " releasing locks manually");
1208: if (backend.getDatabaseSchema() != null)
1209: backend.getDatabaseSchema().releaseLocksOnAllTables(
1210: transactionId);
1211: else {
1212: /*
1213: * At this point, schema can be null, for example if the backend is down
1214: */
1215: if (logger.isWarnEnabled())
1216: logger
1217: .warn("Cannot release locks, as no schema is available on this backend. "
1218: + "This backend is problably not available anymore.");
1219: }
1220: }
1221: }
1222:
1223: private TransactionMetaData getTransactionMetaData(
1224: AbstractRequest request) {
1225: TransactionMetaData tm = null;
1226: if ((request != null) && !request.isAutoCommit()) { // Retrieve the transaction marker metadata
1227: try {
1228: tm = requestManager.getTransactionMetaData(new Long(
1229: request.getTransactionId()));
1230: } catch (SQLException e) {
1231: // We didn't start or lazy start the transaction
1232: if (logger.isDebugEnabled())
1233: logger
1234: .debug("No transaction medatada found for transaction "
1235: + request.getTransactionId());
1236: }
1237: }
1238: return tm;
1239: }
1240:
1241: /**
1242: * Notify the completion of the given entry. The corresponding task completion
1243: * is notified to the backend.
1244: *
1245: * @param entry the executed entry
1246: */
1247: public final void completedEntryExecution(
1248: BackendTaskQueueEntry entry) {
1249: completedEntryExecution(entry, null);
1250: }
1251:
1252: /**
1253: * Perform the cleanup to release locks and priority inversion checkings after
1254: * a stored procedure execution
1255: *
1256: * @param task the task that completed
1257: */
1258: public void completeStoredProcedureExecution(AbstractTask task) {
1259: AbstractRequest request = task.getRequest();
1260: long transactionId = request.getTransactionId();
1261: synchronized (atomicPostSyncObject) {
1262: if (request.isAutoCommit()) {
1263: releaseLocksForAutoCommitRequest(
1264: task.getLocks(backend), transactionId);
1265: checkForPriorityInversion();
1266: SortedSet writeLockedTables = request
1267: .getWriteLockedDatabaseTables();
1268: if ((writeLockedTables != null)
1269: && (writeLockedTables.size() > 1))
1270: writesWithMultipleLocks--;
1271: }
1272: storedProcedureInQueue--;
1273: }
1274: }
1275:
1276: /**
1277: * Perform the cleanup to release locks and priority inversion checkings after
1278: * a write query execution
1279: *
1280: * @param task the task that completed
1281: */
1282: public void completeWriteRequestExecution(AbstractTask task) {
1283: AbstractRequest request = task.getRequest();
1284: SortedSet writeLockedTables = request
1285: .getWriteLockedDatabaseTables();
1286: if ((writeLockedTables != null)
1287: && (writeLockedTables.size() > 1))
1288: synchronized (atomicPostSyncObject) {
1289: writesWithMultipleLocks--;
1290: }
1291:
1292: long transactionId = request.getTransactionId();
1293: if (request.isAutoCommit()) {
1294: synchronized (atomicPostSyncObject) {
1295: releaseLocksForAutoCommitRequest(
1296: task.getLocks(backend), transactionId);
1297: // Make sure we release the requests locking multiple tables or the
1298: // stored procedures that are blocked if any
1299: if (writesWithMultipleLocks > 0
1300: || waitForCompletionPolicy
1301: .isEnforceTableLocking())
1302: checkForPriorityInversion();
1303: else if (storedProcedureInQueue > 0)
1304: checkForPriorityInversion();
1305: }
1306: }
1307: }
1308:
1309: /**
1310: * Releasing locks and checking for priority inversion. Usually used at commit
1311: * or rollback completion time.
1312: *
1313: * @param tm the transaction metadata
1314: */
1315: public void releaseLocksAndCheckForPriorityInversion(
1316: TransactionMetaData tm) {
1317: synchronized (atomicPostSyncObject) {
1318: releaseLocksForTransaction(tm.getTransactionId());
1319: checkForPriorityInversion();
1320: }
1321: }
1322:
1323: /**
1324: * Removes the specified entry from its queue and notifies threads waiting on
1325: * this backend task queue. The removal is performed using the iterator, if
1326: * specified (non-null), or directly on the queue otherwize.
1327: *
1328: * @param entry the entry to remove from its queue
1329: * @param iter the iterator on which to call remove(), or null if not
1330: * applicable.
1331: */
1332: private void completedEntryExecution(BackendTaskQueueEntry entry,
1333: Iterator iter) {
1334: if (entry == null)
1335: return;
1336:
1337: // Notify the backend that this query execution is complete
1338: AbstractTask task = entry.getTask();
1339: if (!backend.removePendingTask(task))
1340: logger.warn("Unable to remove task " + task
1341: + " from pending request queue");
1342:
1343: synchronized (this ) {
1344: // Remove the entry from its queue
1345: LinkedList queue = entry.getQueue();
1346: synchronized (queue) {
1347: if (iter != null)
1348: iter.remove();
1349: else {
1350: if (!queue.remove(entry))
1351: logger.error("Failed to remove task " + task
1352: + " from " + queue);
1353: }
1354: }
1355:
1356: // Notify the queues to unblock queries waiting in getNextEntryToExecute
1357: // for the completion of the current request.
1358: this .notifyAll();
1359: }
1360: }
1361:
1362: /**
1363: * Return the first entry in the conflicting requests queue (does not remove
1364: * it from the list).
1365: *
1366: * @return the first entry in the conflicting queue
1367: */
1368: public final BackendTaskQueueEntry getFirstConflictingRequestQueueOrStoredProcedureQueueEntry() {
1369: synchronized (conflictingRequestsQueue) {
1370: if (conflictingRequestsQueue.isEmpty()) {
1371: synchronized (storedProcedureQueue) {
1372: if (storedProcedureQueue.isEmpty())
1373: return null;
1374: return (BackendTaskQueueEntry) storedProcedureQueue
1375: .getFirst();
1376: }
1377: }
1378: return (BackendTaskQueueEntry) conflictingRequestsQueue
1379: .getFirst();
1380: }
1381: }
1382:
1383: /**
1384: * Returns the stored procedure queue. This is needed for deadlock detection
1385: * but clearly does break the abstarction layer as it exposes a private field
1386: * in an un-controlled way.
1387: *
1388: * @return the stored procedure queue.
1389: */
1390: public List getStoredProcedureQueue() {
1391: return storedProcedureQueue;
1392: }
1393:
1394: /**
1395: * Get the next available task entry to process from the queues. If the
1396: * backend is killed, this method will return a KillThreadTask else it will
1397: * wait for a task to be ready to be executed. Note that the task is left in
1398: * the queue and flagged as processed by the thread given as a parameter. The
1399: * task will only be removed from the queue when the thread notifies the
1400: * completion of the task.
1401: *
1402: * @param thread the thread that will execute the task
1403: * @return the task to execute
1404: */
1405: public final BackendTaskQueueEntry getNextEntryToExecute(
1406: BackendWorkerThread thread) {
1407: BackendTaskQueueEntry entry = null;
1408:
1409: /*
1410: * The strategy is to look first for the non-conflicting queue so that
1411: * non-conflicting transactions could progress as fast as possible. Then we
1412: * check the conflicting queue if we did not find a task to execute.<p> If
1413: * we failed to find something to execute in the active queues, we process
1414: * everything available in the total order queue to push the tasks in the
1415: * active queues.
1416: */
1417:
1418: while (true) {
1419: Object firstNonConflictingTask = null;
1420: Object lastNonConflictingTask = null;
1421: // Check the non-conflicting queue first
1422: synchronized (nonConflictingRequestsQueue) {
1423: if (!nonConflictingRequestsQueue.isEmpty()) {
1424: firstNonConflictingTask = nonConflictingRequestsQueue
1425: .getFirst();
1426: lastNonConflictingTask = nonConflictingRequestsQueue
1427: .getLast();
1428: for (Iterator iter = nonConflictingRequestsQueue
1429: .iterator(); iter.hasNext();) {
1430: entry = (BackendTaskQueueEntry) iter.next();
1431: if (entry.getProcessingThread() == null) { // This task is not currently processed, let's execute it
1432: entry.setProcessingThread(thread);
1433: return entry;
1434: }
1435: }
1436: }
1437: }
1438:
1439: // Nothing to be executed now in the non-conflicting queue, check the
1440: // conflicting queue
1441: Object firstConflictingTask = null;
1442: Object lastConflictingTask = null;
1443: synchronized (conflictingRequestsQueue) {
1444: if (!conflictingRequestsQueue.isEmpty()) {
1445: firstConflictingTask = conflictingRequestsQueue
1446: .getFirst();
1447: lastConflictingTask = conflictingRequestsQueue
1448: .getLast();
1449: // Only check the first task since we must execute them only one at a
1450: // time.
1451: entry = (BackendTaskQueueEntry) conflictingRequestsQueue
1452: .getFirst();
1453: if (entry.getProcessingThread() == null) { // The task is not currently processed.
1454: AbstractRequest request = entry.getTask()
1455: .getRequest();
1456: SortedSet lockedTables = request
1457: .getWriteLockedDatabaseTables();
1458: if ((lockedTables != null)
1459: && (lockedTables.size() > 0)) {
1460: /**
1461: * Check if there are requests in the non-conflicting queue that
1462: * belongs to a transaction that is holding a lock on which we
1463: * conflict.
1464: * <p>
1465: * Note that if we need to lock multiple tables and that we are in
1466: * the conflicting queue, we are going to wait until all locks are
1467: * free or a deadlock detection occurs.
1468: */
1469: boolean conflictingQueryDetected = false;
1470: synchronized (nonConflictingRequestsQueue) {
1471: if (!nonConflictingRequestsQueue
1472: .isEmpty()
1473: || waitForCompletionPolicy
1474: .isEnforceTableLocking()) { // Check for a potential conflict
1475: int locksNotOwnedByMe = 0;
1476: long transactionId = entry
1477: .getTask()
1478: .getTransactionId();
1479: DatabaseSchema schema = backend
1480: .getDatabaseSchema();
1481: for (Iterator iterator = lockedTables
1482: .iterator(); iterator
1483: .hasNext()
1484: && !conflictingQueryDetected;) {
1485: String tableName = (String) iterator
1486: .next();
1487: DatabaseTable table = schema
1488: .getTable(tableName,
1489: false);
1490: if (table == null) { // No table found, let's go for the conflicting queue
1491: logger
1492: .warn("Unable to find table "
1493: + tableName
1494: + " in database schema, when getting next entry to execute : "
1495: + request
1496: .toStringShortForm(requestManager
1497: .getVirtualDatabase()
1498: .getSqlShortFormLength()));
1499:
1500: // Assume conflict since non-conflicting queue is not
1501: // empty
1502: conflictingQueryDetected = true;
1503: } else {
1504: TransactionLogicalLock lock = table
1505: .getLock();
1506: if (lock.isLocked()) {
1507: if (lock.getLocker() != transactionId)
1508: locksNotOwnedByMe++;
1509:
1510: /*
1511: * Check if we find a query in the conflicting queue
1512: * that owns the lock or waits for the lock we need
1513: */
1514: for (Iterator iter = nonConflictingRequestsQueue
1515: .iterator(); iter
1516: .hasNext();) {
1517: BackendTaskQueueEntry nonConflictingEntry = (BackendTaskQueueEntry) iter
1518: .next();
1519: long nonConflictingRequestTransactionId = nonConflictingEntry
1520: .getTask()
1521: .getTransactionId();
1522: if ((lock
1523: .getLocker() == nonConflictingRequestTransactionId)
1524: || lock
1525: .isWaiting(nonConflictingRequestTransactionId)) {
1526: conflictingQueryDetected = true;
1527: break;
1528: }
1529: }
1530: }
1531: }
1532: }
1533:
1534: /*
1535: * If table level locking is enforced, we don't allow a
1536: * request to execute before it has all its locks
1537: */
1538: if (waitForCompletionPolicy
1539: .isEnforceTableLocking())
1540: conflictingQueryDetected = locksNotOwnedByMe > 0;
1541:
1542: /*
1543: * If we don't own a single lock (in case of multiple locks)
1544: * needed by this query then we wait for the locks to be
1545: * released or the deadlock detection to abort a transaction
1546: * that is holding at least one of the locks that we need.
1547: */
1548: conflictingQueryDetected = conflictingQueryDetected
1549: || ((locksNotOwnedByMe > 1) && (locksNotOwnedByMe == lockedTables
1550: .size()));
1551: }
1552: }
1553:
1554: // If everyone is done in the non-conflicting queue, then
1555: // let's go with this conflicting request
1556: if (!conflictingQueryDetected) {
1557: entry.setProcessingThread(thread);
1558: return entry;
1559: }
1560: } else {
1561: if (logger.isWarnEnabled())
1562: logger
1563: .warn("Detected non-locking task "
1564: + entry.getTask()
1565: + " in conflicting queue");
1566:
1567: /*
1568: * No clue on where the conflict happens, it might well be that we
1569: * don't access any table but in that case we shouldn't have ended
1570: * up in the conflicting queue. To be safer, let's wait for the
1571: * non-conflicting queue to be empty.
1572: */
1573: synchronized (nonConflictingRequestsQueue) {
1574: if (nonConflictingRequestsQueue
1575: .isEmpty()) {
1576: entry.setProcessingThread(thread);
1577: return entry;
1578: }
1579: }
1580: }
1581: }
1582: }
1583: }
1584:
1585: synchronized (this ) {
1586: // No entry in the queues or all entries are currently processed,
1587: // process the total order queue.
1588: if (fetchNextQueryFromBackendTotalOrderQueue())
1589: continue;
1590:
1591: // Nothing in the total order queue either !
1592: // Double-check that something was not posted in the queue after we
1593: // scanned it
1594: synchronized (nonConflictingRequestsQueue) {
1595: if (!nonConflictingRequestsQueue.isEmpty()) {
1596: if (firstNonConflictingTask != nonConflictingRequestsQueue
1597: .getFirst())
1598: continue;
1599: if (lastNonConflictingTask != nonConflictingRequestsQueue
1600: .getLast())
1601: continue;
1602: } else if (firstNonConflictingTask != null)
1603: continue; // The queue was emptied all at once
1604: }
1605: synchronized (conflictingRequestsQueue) {
1606: if (!conflictingRequestsQueue.isEmpty()) {
1607: if (firstConflictingTask != conflictingRequestsQueue
1608: .getFirst())
1609: continue;
1610: if (lastConflictingTask != conflictingRequestsQueue
1611: .getLast())
1612: continue;
1613: } else if (firstConflictingTask != null)
1614: continue; // The queue was emptied all at once
1615: }
1616:
1617: // Wait until a new task is posted
1618: try {
1619: this .wait();
1620: } catch (InterruptedException ignore) {
1621: }
1622: }
1623:
1624: }
1625: }
1626:
1627: /**
1628: * Get the next available commit or rollback task to process from the queues.
1629: * If the backend is killed, this method will return a KillThreadTask else it
1630: * will wait for a task to be ready to be executed. Note that the task is left
1631: * in the queue and flagged as processed by the thread given as a parameter.
1632: * The task will only be removed from the queue when the thread notifies the
1633: * completion of the task.
1634: *
1635: * @param thread the thread that will execute the task
1636: * @return the commmit or rollback task to execute
1637: */
1638: public BackendTaskQueueEntry getNextCommitRollbackToExecute(
1639: BackendWorkerThread thread) {
1640: boolean found = false;
1641: BackendTaskQueueEntry entry = null;
1642: while (!found) {
1643: Object firstNonConflictingTask = null;
1644: Object lastNonConflictingTask = null;
1645: // Check the non-conflicting queue first
1646: synchronized (nonConflictingRequestsQueue) {
1647: if (!nonConflictingRequestsQueue.isEmpty()) {
1648: firstNonConflictingTask = nonConflictingRequestsQueue
1649: .getFirst();
1650: lastNonConflictingTask = nonConflictingRequestsQueue
1651: .getLast();
1652: for (Iterator iter = nonConflictingRequestsQueue
1653: .iterator(); iter.hasNext();) {
1654: entry = (BackendTaskQueueEntry) iter.next();
1655: if ((entry.isACommitOrRollback() || (entry
1656: .getTask() instanceof KillThreadTask))
1657: && (entry.getProcessingThread() == null)) { // This task is not currently processed, let's execute it
1658: entry.setProcessingThread(thread);
1659: return entry;
1660: }
1661: }
1662: }
1663: }
1664:
1665: synchronized (this ) {
1666: // No entry in the queues or all entries are currently processed,
1667: // process the total order queue.
1668: if (fetchNextQueryFromBackendTotalOrderQueue())
1669: continue;
1670:
1671: // Double-check that something was not posted in the queue after we
1672: // scanned it
1673: synchronized (nonConflictingRequestsQueue) {
1674: if (!nonConflictingRequestsQueue.isEmpty()) {
1675: if (firstNonConflictingTask != nonConflictingRequestsQueue
1676: .getFirst())
1677: continue;
1678: if (lastNonConflictingTask != nonConflictingRequestsQueue
1679: .getLast())
1680: continue;
1681: }
1682: }
1683:
1684: try {
1685: this .wait();
1686: } catch (InterruptedException ignore) {
1687: }
1688: }
1689:
1690: }
1691: // We should never reach this point
1692: return null;
1693: }
1694:
1695: /**
1696: * Checks if the current entry needs to wait for a later entry before being
1697: * able to execute.
1698: *
1699: * @param currentTask the current <code>AbstractTask</code> candidate for
1700: * scheduling
1701: * @return <code>true</code> if the current task needs to wait for a late
1702: * task before being able to execute, <code>false</code> else
1703: */
1704: private boolean mustWaitForLateTask(AbstractTask currentTask) {
1705: if (currentTask.isPersistentConnection()) {
1706: long currentCid = currentTask.getPersistentConnectionId();
1707: // Check if there are other requests for this transaction in
1708: // the queue
1709: if (hasTaskForPersistentConnectionInQueue(
1710: nonConflictingRequestsQueue, currentCid)
1711: || hasTaskForPersistentConnectionInQueue(
1712: conflictingRequestsQueue, currentCid)
1713: || hasTaskForPersistentConnectionInQueue(
1714: storedProcedureQueue, currentCid))
1715: // Skip this commit/rollback until the conflicting request completes
1716: return true;
1717: }
1718:
1719: if (!currentTask.isAutoCommit()) {
1720: long currentTid = currentTask.getTransactionId();
1721: // Check if there are other requests for this transaction in
1722: // the queue
1723: if (hasTaskForTransactionInQueue(
1724: nonConflictingRequestsQueue, currentTid)
1725: || hasTaskForTransactionInQueue(
1726: conflictingRequestsQueue, currentTid)
1727: || hasTaskForTransactionInQueue(
1728: storedProcedureQueue, currentTid))
1729: // Skip this commit/rollback until the conflicting request completes
1730: return true;
1731: }
1732:
1733: return hasDDLTaskInQueue(nonConflictingRequestsQueue)
1734: || hasDDLTaskInQueue(conflictingRequestsQueue)
1735: || hasDDLTaskInQueue(storedProcedureQueue);
1736: }
1737:
1738: private boolean hasDDLTaskInQueue(List queue) {
1739: boolean retry;
1740: do {
1741: retry = false;
1742: try {
1743: for (Iterator iter = queue.iterator(); iter.hasNext();) {
1744: BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1745: .next();
1746: AbstractTask otherTask = otherEntry.getTask();
1747: AbstractRequest request = otherTask.getRequest();
1748: /**
1749: * For the moment just check if this is a create, drop or alter
1750: * statement, we could also check
1751: * AbstractRequest#altersDatabaseSchema() but we don't want to block
1752: * if this is not a DDL (a stored procedure might alter the schema
1753: * because of its default semantic but still might need other queries
1754: * to be executed before it can really execute).
1755: */
1756: if ((request != null)
1757: && (request.isCreate() || request.isAlter() || request
1758: .isDrop())) {
1759: return true;
1760: }
1761: }
1762: } catch (ConcurrentModificationException e) {
1763: retry = true;
1764: }
1765: } while (retry);
1766: return false;
1767: }
1768:
1769: private boolean hasTaskForPersistentConnectionInQueue(List queue,
1770: long cid) {
1771: boolean retry;
1772: do {
1773: retry = false;
1774: try {
1775: for (Iterator iter = queue.iterator(); iter.hasNext();) {
1776: BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1777: .next();
1778:
1779: AbstractTask otherTask = otherEntry.getTask();
1780:
1781: // Check if the query is in the same transaction
1782: if (otherTask.isPersistentConnection()
1783: && (otherTask.getPersistentConnectionId() == cid)) {
1784: return true;
1785: }
1786: }
1787: } catch (ConcurrentModificationException e) {
1788: retry = true;
1789: }
1790: } while (retry);
1791: return false;
1792: }
1793:
1794: private boolean hasTaskForTransactionInQueue(List queue, long tid) {
1795: boolean retry;
1796: do {
1797: retry = false;
1798: try {
1799: for (Iterator iter = queue.iterator(); iter.hasNext();) {
1800: BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1801: .next();
1802:
1803: AbstractTask otherTask = otherEntry.getTask();
1804:
1805: // Check if the query is in the same transaction
1806: if (!otherTask.isAutoCommit()
1807: && (otherTask.getTransactionId() == tid)) {
1808: return true;
1809: }
1810: }
1811: } catch (ConcurrentModificationException e) {
1812: retry = true;
1813: }
1814: } while (retry);
1815: return false;
1816: }
1817:
1818: /**
1819: * Return true if tasks are allowed to be posted to the queue. If false, all
1820: * tasks posted to the queue are systematically notified for completion
1821: * without being executed (abort behavior)
1822: *
1823: * @return Returns the allowTasksToBePosted.
1824: */
1825: public boolean allowTasksToBePosted() {
1826: synchronized (ALLOW_TASKS_SYNC) {
1827: return allowTasksToBePosted;
1828: }
1829: }
1830:
1831: /**
1832: * Set to true if tasks are allowed to be posted to the queue else, all tasks
1833: * posted to the queue are systematically notified for completion without
1834: * being executed (abort behavior)
1835: *
1836: * @param allowTasksToBePosted The allowTasksToBePosted to set.
1837: */
1838: public void setAllowTasksToBePosted(boolean allowTasksToBePosted) {
1839: synchronized (ALLOW_TASKS_SYNC) {
1840: this .allowTasksToBePosted = allowTasksToBePosted;
1841: }
1842: }
1843:
1844: /**
1845: * Start a new Deadlock Detection Thread (throws a RuntimeException if called
1846: * twice without stopping the thread before the second call).
1847: *
1848: * @param vdb the virtual database the backend is attached to
1849: */
1850: public void startDeadlockDetectionThread(VirtualDatabase vdb) {
1851: if (deadlockDetectionThread != null)
1852: throw new RuntimeException(
1853: "Trying to start multiple times a deadlock detection thread on the same backend "
1854: + backend.getName());
1855:
1856: deadlockDetectionThread = new DeadlockDetectionThread(backend,
1857: vdb, atomicPostSyncObject, waitForCompletionPolicy
1858: .getDeadlockTimeoutInMs());
1859: deadlockDetectionThread.start();
1860: }
1861:
1862: /**
1863: * Terminate the Deadlock Detection Thread. Throws a RuntimeException is the
1864: * thread was already stopped (or not started).
1865: */
1866: public void terminateDeadlockDetectionThread() {
1867: if (deadlockDetectionThread == null)
1868: throw new RuntimeException(
1869: "No deadlock detection thread to stop on backend "
1870: + backend.getName());
1871:
1872: deadlockDetectionThread.kill();
1873: deadlockDetectionThread = null;
1874: }
1875:
1876: /**
1877: * Returns a <code>String</code> corresponding to the dump of the internal
1878: * state of this BackendTaskQueues.<br />
1879: * This method is synchronized to provided a consistent snapshots of the
1880: * queues.
1881: *
1882: * @return a <code>String</code> representing the internal state of this
1883: * BackendTaskQueues
1884: */
1885: protected synchronized String dump() {
1886: StringBuffer buff = new StringBuffer();
1887: buff.append("Non Conflicting Requests Queue ("
1888: + nonConflictingRequestsQueue.size() + ")\n");
1889: for (Iterator iter = nonConflictingRequestsQueue.iterator(); iter
1890: .hasNext();) {
1891: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
1892: .next();
1893: buff.append("\t" + entry + "\n");
1894: }
1895: buff.append("Conflicting Requests Queue ("
1896: + conflictingRequestsQueue.size() + ")\n");
1897: for (Iterator iter = conflictingRequestsQueue.iterator(); iter
1898: .hasNext();) {
1899: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
1900: .next();
1901: buff.append("\t" + entry + "\n");
1902: }
1903: buff.append("Stored Procedures Queue ("
1904: + storedProcedureQueue.size() + ")\n");
1905: for (Iterator iter = storedProcedureQueue.iterator(); iter
1906: .hasNext();) {
1907: BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
1908: .next();
1909: buff.append("\t" + entry + "\n");
1910: }
1911: return buff.toString();
1912: }
1913: }
|