Source Code Cross Referenced for BackendTaskQueues.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » loadbalancer » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.loadbalancer 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0004:         * Copyright (C) 2005-2006 Continuent, Inc.
0005:         * Contact: sequoia@continuent.org
0006:         *
0007:         * Licensed under the Apache License, Version 2.0 (the "License");
0008:         * you may not use this file except in compliance with the License.
0009:         * You may obtain a copy of the License at
0010:         *
0011:         * http://www.apache.org/licenses/LICENSE-2.0
0012:         *
0013:         * Unless required by applicable law or agreed to in writing, software
0014:         * distributed under the License is distributed on an "AS IS" BASIS,
0015:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0016:         * See the License for the specific language governing permissions and
0017:         * limitations under the License.
0018:         *
0019:         * Initial developer(s): Emmanuel Cecchet.
0020:         * Contributor(s): ______________________.
0021:         */package org.continuent.sequoia.controller.loadbalancer;
0022:
0023:        import java.sql.SQLException;
0024:        import java.sql.Statement;
0025:        import java.util.ArrayList;
0026:        import java.util.ConcurrentModificationException;
0027:        import java.util.Iterator;
0028:        import java.util.LinkedList;
0029:        import java.util.List;
0030:        import java.util.SortedSet;
0031:
0032:        import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0033:        import org.continuent.sequoia.common.locks.DeadlockDetectionThread;
0034:        import org.continuent.sequoia.common.locks.TransactionLogicalLock;
0035:        import org.continuent.sequoia.common.log.Trace;
0036:        import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0037:        import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0038:        import org.continuent.sequoia.common.sql.schema.DatabaseTable;
0039:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0040:        import org.continuent.sequoia.controller.connection.PooledConnection;
0041:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0042:        import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0043:        import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
0044:        import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
0045:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0046:        import org.continuent.sequoia.controller.requestmanager.RequestManager;
0047:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0048:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0049:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0050:        import org.continuent.sequoia.controller.requests.CreateRequest;
0051:        import org.continuent.sequoia.controller.requests.ParsingGranularities;
0052:        import org.continuent.sequoia.controller.requests.SelectRequest;
0053:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0054:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0055:
0056:        /**
0057:         * This class defines task queues that stores the requests to be executed on a
0058:         * database backend.
0059:         *
0060:         * @author <a href="mailto:emmanuel.cecchet@emicnetworks.com">Emmanuel Cecchet</a>
0061:         * @version 1.0
0062:         */
0063:        public class BackendTaskQueues {
0064:            /** Queue in which queries arrive in total order */
0065:            private LinkedList totalOrderQueue;
0066:            /**
0067:             * Queue for stored procedures without semantic information (locking the whole
0068:             * database)
0069:             */
0070:            private LinkedList storedProcedureQueue;
0071:            /**
0072:             * Queue for conflicting requests (only first request of the queue can be
0073:             * executed)
0074:             */
0075:            private LinkedList conflictingRequestsQueue;
0076:            /**
0077:             * Queue for non-conflicting requests that can be executed in parallel, in any
0078:             * order.
0079:             */
0080:            private LinkedList nonConflictingRequestsQueue;
0081:            /** Backend these queues are attached to */
0082:            private DatabaseBackend backend;
0083:            private WaitForCompletionPolicy waitForCompletionPolicy;
0084:            private RequestManager requestManager;
0085:            private boolean allowTasksToBePosted;
0086:            private final Object ALLOW_TASKS_SYNC = new Object();
0087:
0088:            private DeadlockDetectionThread deadlockDetectionThread;
0089:
0090:            // Number of stored procedures that have been posted in the queue and that
0091:            // have not completed yet
0092:            private int storedProcedureInQueue = 0;
0093:
0094:            private int writesWithMultipleLocks = 0;
0095:            private Trace logger;
0096:
0097:            /**
0098:             * Creates a new <code>BackendTaskQueues</code> object
0099:             *
0100:             * @param backend DatabaseBackend associated to these queues
0101:             * @param waitForCompletionPolicy the load balancer wait for completion policy
0102:             * @param requestManager the request manager associated with these queues
0103:             */
0104:            public BackendTaskQueues(DatabaseBackend backend,
0105:                    WaitForCompletionPolicy waitForCompletionPolicy,
0106:                    RequestManager requestManager) {
0107:                this .backend = backend;
0108:                this .logger = backend.getLogger();
0109:                this .waitForCompletionPolicy = waitForCompletionPolicy;
0110:                this .requestManager = requestManager;
0111:                totalOrderQueue = new LinkedList();
0112:                storedProcedureQueue = new LinkedList();
0113:                conflictingRequestsQueue = new LinkedList();
0114:                nonConflictingRequestsQueue = new LinkedList();
0115:                allowTasksToBePosted = false;
0116:            }
0117:
0118:            /**
0119:             * Abort all queries belonging to the provided transaction.
0120:             *
0121:             * @param tid the transaction identifier
0122:             * @return true if a rollback is already in progress
0123:             */
0124:            public boolean abortAllQueriesForTransaction(long tid) {
0125:                synchronized (this ) {
0126:                    boolean rollbackInProgress = abortAllQueriesEvenRunningInTransaction(
0127:                            tid, storedProcedureQueue);
0128:                    if (abortAllQueriesEvenRunningInTransaction(tid,
0129:                            conflictingRequestsQueue))
0130:                        rollbackInProgress = true;
0131:                    if (abortAllQueriesEvenRunningInTransaction(tid,
0132:                            nonConflictingRequestsQueue))
0133:                        rollbackInProgress = true;
0134:                    return rollbackInProgress;
0135:                }
0136:            }
0137:
0138:            /**
0139:             * Abort all queries belonging to the given transaction even if they are
0140:             * currently processed by a BackendWorkerThread.
0141:             *
0142:             * @param tid transaction identifier
0143:             * @param queue queue to scan for queries to abort
0144:             * @return true if a rollback is already in progress
0145:             */
0146:            private boolean abortAllQueriesEvenRunningInTransaction(long tid,
0147:                    LinkedList queue) {
0148:                boolean rollbackInProgress = false;
0149:                synchronized (queue) {
0150:                    Long lTid = new Long(tid);
0151:                    for (Iterator iter = queue.iterator(); iter.hasNext();) {
0152:                        BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0153:                                .next();
0154:                        boolean isProcessing = false;
0155:                        AbstractTask task = entry.getTask();
0156:                        if (task.getTransactionId() == tid) {
0157:                            if (task instanceof  RollbackTask)
0158:                                rollbackInProgress = true;
0159:                            else { /*
0160:                             * If we cancel a task in a transaction that was supposed to be
0161:                             * lazily started by that task, then we have to fake the transaction
0162:                             * start on this backend so that the transaction rollback and
0163:                             * subsequent check for priority inversion occurs on that backend.
0164:                             */
0165:                                if (!task.isAutoCommit()
0166:                                        && !backend.isStartedTransaction(lTid))
0167:                                    backend.startTransaction(lTid);
0168:
0169:                                if (logger.isDebugEnabled())
0170:                                    logger.debug("Aborting request "
0171:                                            + task.getRequest()
0172:                                            + " on backend "
0173:                                            + backend.getName());
0174:
0175:                                BackendWorkerThread processingThread = entry
0176:                                        .getProcessingThread();
0177:                                if (processingThread != null) { // A thread is working on it, cancel the task
0178:                                    isProcessing = true;
0179:                                    Statement s = processingThread
0180:                                            .getCurrentStatement();
0181:                                    if (s != null) {
0182:                                        try {
0183:                                            s.cancel();
0184:                                        } catch (SQLException e) {
0185:                                            logger
0186:                                                    .warn(
0187:                                                            "Unable to cancel execution of request",
0188:                                                            e);
0189:                                        } catch (NullPointerException e) {
0190:                                            if (logger.isWarnEnabled())
0191:                                                logger
0192:                                                        .warn(
0193:                                                                "Ignoring NullPointerException caused by Connector/J 5.0.4 bug #24721",
0194:                                                                e);
0195:                                        }
0196:                                    }
0197:                                }
0198:                                if (!task.hasCompleted()) {
0199:                                    try {
0200:                                        if (processingThread == null) { // abort has been called on a non-processed query, use a
0201:                                            // random worker thread for notification
0202:                                            processingThread = backend
0203:                                                    .getBackendWorkerThreadForNotification();
0204:                                            if (processingThread == null) { // No worker thread left, should never happen.
0205:                                                // Backend already disabled?
0206:                                                logger
0207:                                                        .warn("No worker thread found for request abort notification, creating fake worker thread");
0208:                                                processingThread = new BackendWorkerThread(
0209:                                                        backend,
0210:                                                        requestManager
0211:                                                                .getLoadBalancer());
0212:                                            }
0213:                                        }
0214:                                        task
0215:                                                .notifyFailure(
0216:                                                        processingThread,
0217:                                                        -1L,
0218:                                                        new SQLException(
0219:                                                                "Transaction aborted due to deadlock"));
0220:                                    } catch (SQLException ignore) {
0221:                                    }
0222:                                }
0223:                                if (!isProcessing) {
0224:                                    /*
0225:                                     * If the task was being processed by a thread, the completion
0226:                                     * will be notified by the thread itself
0227:                                     */
0228:                                    completedEntryExecution(entry, iter);
0229:                                }
0230:                            }
0231:                        }
0232:                    }
0233:                }
0234:                return rollbackInProgress;
0235:            }
0236:
0237:            /**
0238:             * Abort all requests remaining in the queues. This is usually called when the
0239:             * backend is disabled and no backend worker thread should be processing any
0240:             * request any more (this will generate a warning otherwise).
0241:             */
0242:            public void abortRemainingRequests() {
0243:                setAllowTasksToBePosted(false);
0244:                abortRemainingRequests(storedProcedureQueue);
0245:                abortRemainingRequests(conflictingRequestsQueue);
0246:                abortRemainingRequests(nonConflictingRequestsQueue);
0247:            }
0248:
0249:            /**
0250:             * Abort the remaining request in the given queue
0251:             *
0252:             * @param queue the queue to purge
0253:             */
0254:            private void abortRemainingRequests(LinkedList queue) {
0255:                synchronized (this ) {
0256:                    synchronized (queue) {
0257:                        for (Iterator iter = queue.iterator(); iter.hasNext();) {
0258:                            BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0259:                                    .next();
0260:                            AbstractTask task = entry.getTask();
0261:
0262:                            // Do not cancel KillThreadTasks
0263:                            if (task instanceof  KillThreadTask)
0264:                                continue;
0265:
0266:                            if (entry.getProcessingThread() != null) { // A thread is working on it, warn and cancel the task
0267:                                logger
0268:                                        .warn("A worker thread was still processing task "
0269:                                                + task
0270:                                                + ", aborting the request execution.");
0271:                                Statement s = entry.getProcessingThread()
0272:                                        .getCurrentStatement();
0273:                                if (s != null) {
0274:                                    try {
0275:                                        s.cancel();
0276:                                    } catch (SQLException e) {
0277:                                        logger
0278:                                                .warn(
0279:                                                        "Unable to cancel execution of request",
0280:                                                        e);
0281:                                    }
0282:                                }
0283:                            }
0284:                            if (!task.hasCompleted()) {
0285:                                if (logger.isDebugEnabled())
0286:                                    logger.debug("Cancelling task " + task);
0287:                                task.notifyCompletion(entry
0288:                                        .getProcessingThread());
0289:                            }
0290:                            completedEntryExecution(entry, iter);
0291:                        }
0292:                    }
0293:                }
0294:            }
0295:
0296:            /**
0297:             * Add a task at the end of the backend total order queue
0298:             *
0299:             * @param task the task to add
0300:             */
0301:            public final void addTaskToBackendTotalOrderQueue(AbstractTask task) {
0302:                synchronized (this ) {
0303:                    synchronized (totalOrderQueue) {
0304:                        totalOrderQueue.addLast(task);
0305:                    }
0306:
0307:                    /*
0308:                     * Wake up all worker threads in case we post multiple tasks before a
0309:                     * thread had time to take this task into account (this would result in a
0310:                     * lost notified event).
0311:                     */
0312:                    this .notifyAll();
0313:                }
0314:            }
0315:
0316:            /**
0317:             * Add a task at the end of the backend total order queue. Block as long as
0318:             * the total order queue size if over the indicated queue size.
0319:             *
0320:             * @param task the task to add
0321:             * @param queueSize the maximum queue size
0322:             */
0323:            public final void addTaskToBackendTotalOrderQueue(
0324:                    AbstractTask task, int queueSize) {
0325:                synchronized (this ) {
0326:                    boolean mustNotify = false;
0327:                    do {
0328:                        synchronized (totalOrderQueue) {
0329:                            if (totalOrderQueue.size() < queueSize) {
0330:                                totalOrderQueue.addLast(task);
0331:                                mustNotify = true;
0332:                            }
0333:                        }
0334:
0335:                        if (mustNotify) {
0336:                            /*
0337:                             * Wake up all worker threads in case we post multiple tasks before a
0338:                             * thread had time to take this task into account (this would result
0339:                             * in a lost notified event).
0340:                             */
0341:                            this .notifyAll();
0342:                            return; // exit method here
0343:                        } else {
0344:                            try { // Wait for queue to free an entry
0345:                                this .wait();
0346:                            } catch (InterruptedException e) {
0347:                            }
0348:                        }
0349:                    } while (!mustNotify);
0350:                }
0351:            }
0352:
0353:            /**
0354:             * Add a task in the ConflictingRequestsQueue.
0355:             *
0356:             * @param task task to add
0357:             */
0358:            private void addTaskInConflictingRequestsQueue(AbstractTask task) {
0359:                addTaskToQueue(conflictingRequestsQueue, task, false);
0360:            }
0361:
0362:            /**
0363:             * Add a task in the NonConflictingRequestsQueue.
0364:             *
0365:             * @param task task to add
0366:             * @param isACommitOrRollback true if the task is a commit or a rollback
0367:             */
0368:            private void addTaskInNonConflictingRequestsQueue(
0369:                    AbstractTask task, boolean isACommitOrRollback) {
0370:                addTaskToQueue(nonConflictingRequestsQueue, task,
0371:                        isACommitOrRollback);
0372:            }
0373:
0374:            /**
0375:             * Add a task in the StoredProcedureQueue.
0376:             *
0377:             * @param task task to add
0378:             */
0379:            private void addTaskInStoredProcedureQueue(AbstractTask task) {
0380:                addTaskToQueue(storedProcedureQueue, task, false);
0381:            }
0382:
0383:            /**
0384:             * Add the task in the given queue and notify the queue. Note that the task is
0385:             * also added to the backend pending write request queue. addTaskToQueue
0386:             *
0387:             * @param queue queue in which the task must be added
0388:             * @param task the task to add
0389:             * @param isACommitOrRollback true if the task is a commit or a rollback
0390:             */
0391:            private void addTaskToQueue(LinkedList queue, AbstractTask task,
0392:                    boolean isACommitOrRollback) {
0393:                if (!allowTasksToBePosted()) {
0394:                    if (logger.isDebugEnabled())
0395:                        logger.debug("Cancelling task " + task);
0396:                    task.notifyCompletion(null);
0397:                    return;
0398:                }
0399:
0400:                // We assume that all requests here are writes
0401:                backend.addPendingTask(task);
0402:                if (logger.isDebugEnabled())
0403:                    logger.debug("Adding task " + task
0404:                            + " to pending request queue");
0405:
0406:                synchronized (this ) {
0407:                    // Add to the queue
0408:                    synchronized (queue) {
0409:                        queue.addLast(new BackendTaskQueueEntry(task, queue,
0410:                                isACommitOrRollback));
0411:                    }
0412:
0413:                    /*
0414:                     * Wake up all worker threads in case we post multiple tasks before a
0415:                     * thread had time to take this task into account (this would result in a
0416:                     * lost notified event).
0417:                     */
0418:                    this .notifyAll();
0419:                }
0420:            }
0421:
0422:            /**
0423:             * Check for priority inversion in the conflicting queue or possibly stored
0424:             * procedure queue flushing if a transaction executing a stored procedure has
0425:             * just completed. remove this entry and possibly re-arrange the queues
0426:             */
0427:            public final void checkForPriorityInversion() {
0428:                DatabaseSchema schema = backend.getDatabaseSchema();
0429:
0430:                // Let's check the conflicting queue for priority inversion
0431:                synchronized (conflictingRequestsQueue) {
0432:                    for (Iterator iter = conflictingRequestsQueue.iterator(); iter
0433:                            .hasNext();) {
0434:                        BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0435:                                .next();
0436:
0437:                        // If the entry is currently processed, don't try to move it else it
0438:                        // would be duplicated in the non-conflicting queue!!!
0439:                        if (entry.processingThread != null)
0440:                            continue;
0441:
0442:                        AbstractTask task = entry.getTask();
0443:                        if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) {
0444:                            if (task.getSuccess() + task.getFailed() > 0) { // Task has already been started by other nodes, just proceed with
0445:                                // it, we are late!
0446:                                if (logger.isDebugEnabled())
0447:                                    logger
0448:                                            .debug("Priority inversion for already started request "
0449:                                                    + task.getRequest());
0450:                                moveToNonConflictingQueue(iter, entry);
0451:                                continue;
0452:                            }
0453:                        }
0454:
0455:                        AbstractRequest request = task.getRequest();
0456:                        SortedSet lockedTables = request
0457:                                .getWriteLockedDatabaseTables();
0458:                        if (lockedTables != null) {
0459:                            boolean queryIsConflicting = false;
0460:                            for (Iterator iterator = lockedTables.iterator(); iterator
0461:                                    .hasNext()
0462:                                    && !queryIsConflicting;) {
0463:                                String tableName = (String) iterator.next();
0464:                                DatabaseTable table = schema.getTable(
0465:                                        tableName, false);
0466:                                if (table == null) { // No table found, stay in the conflicting queue
0467:                                    logger
0468:                                            .warn("Unable to find table "
0469:                                                    + tableName
0470:                                                    + " in database schema, when checking priority inversion for query "
0471:                                                    + request
0472:                                                            .toStringShortForm(requestManager
0473:                                                                    .getVirtualDatabase()
0474:                                                                    .getSqlShortFormLength()));
0475:                                } else {
0476:                                    /*
0477:                                     * If the table we are conflicting with now belongs to us then we
0478:                                     * can go in the non-conflicting queue. Note that it is not
0479:                                     * possible for the lock to be free since we have acquired it
0480:                                     * earlier and we are waiting for our turn.
0481:                                     */
0482:                                    TransactionLogicalLock lock = table
0483:                                            .getLock();
0484:                                    if (!lock.isLocked())
0485:                                        logger
0486:                                                .warn("Unexpected free lock on table "
0487:                                                        + table);
0488:                                    else { // Check that the lock belong to our transaction
0489:                                        queryIsConflicting = lock.getLocker() != task
0490:                                                .getTransactionId();
0491:                                    }
0492:                                }
0493:                            }
0494:                            if (!queryIsConflicting) { // Locks are now free, move to the non-conflicting queue
0495:                                // Do not try to take the lock again else it will not be released
0496:                                if (logger.isDebugEnabled())
0497:                                    logger
0498:                                            .debug("Priority inversion for request "
0499:                                                    + task.getRequest());
0500:                                moveToNonConflictingQueue(iter, entry);
0501:                            }
0502:                        } else { // Query does not lock anything, it should not have been posted in the
0503:                            // conflicting queue
0504:                            logger.warn("Non-locking task " + task
0505:                                    + " was posted in conflicting queue");
0506:                            if (logger.isDebugEnabled())
0507:                                logger.debug("Priority inversion for request "
0508:                                        + task.getRequest());
0509:                            moveToNonConflictingQueue(iter, entry);
0510:                        }
0511:                    }
0512:                }
0513:
0514:                // Look at the stored procedure queue
0515:                synchronized (storedProcedureQueue) {
0516:                    for (Iterator iter = storedProcedureQueue.iterator(); iter
0517:                            .hasNext();) {
0518:                        BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
0519:                                .next();
0520:
0521:                        TransactionLogicalLock globalLock = schema.getLock();
0522:                        AbstractTask task = entry.getTask();
0523:                        AbstractRequest request = task.getRequest();
0524:                        if (globalLock.isLocked()) { // Stored procedure is executing
0525:                            if (task.getTransactionId() == globalLock
0526:                                    .getLocker()) {
0527:                                // Just wait for current transactions to complete if all locks are
0528:                                // not free.
0529:                                if (!schema
0530:                                        .allTablesAreUnlockedOrLockedByTransaction(request))
0531:                                    return;
0532:
0533:                                /*
0534:                                 * We belong to the transaction that executes the stored procedure
0535:                                 * (or to the auto commit request that holds the lock), let's go in
0536:                                 * the non-conflicting request queue.
0537:                                 */
0538:                                moveToNonConflictingQueue(iter, entry);
0539:                                // if we are in auto commit, it means that we are the stored
0540:                                // procedure which has acquired the lock during the atomic post
0541:                                if (task.isAutoCommit())
0542:                                    return;
0543:                                continue;
0544:                            } else { // Check if the stored procedure currently executing is not
0545:                                // somewhere in the stored procedure queue.
0546:
0547:                                boolean currentStoredProcedureInQueue = false;
0548:                                for (Iterator iter2 = storedProcedureQueue
0549:                                        .iterator(); iter2.hasNext();) {
0550:                                    BackendTaskQueueEntry entry2 = (BackendTaskQueueEntry) iter2
0551:                                            .next();
0552:                                    AbstractTask task2 = entry2.getTask();
0553:                                    if ((task2 != null)
0554:                                            && (task2.getTransactionId() == globalLock
0555:                                                    .getLocker()))
0556:                                        currentStoredProcedureInQueue = true;
0557:                                }
0558:
0559:                                // If the stored procedure is not in the queue then it is currently
0560:                                // executing and we have to wait for its completion
0561:                                if (!currentStoredProcedureInQueue)
0562:                                    return;
0563:                            }
0564:                        }
0565:
0566:                        // Schema is not locked, no stored procedure currently executes
0567:                        TransactionMetaData tm = getTransactionMetaData(request);
0568:
0569:                        if ((request instanceof  SelectRequest)
0570:                                || (request instanceof  AbstractWriteRequest)) {
0571:                            SortedSet writeLockedTables = request
0572:                                    .getWriteLockedDatabaseTables();
0573:
0574:                            if (writeLockedTables == null
0575:                                    || writeLockedTables.isEmpty()) { // This request does not lock anything
0576:                                moveToNonConflictingQueue(iter, entry);
0577:                                continue;
0578:                            }
0579:
0580:                            moveMultipleWriteLocksQuery(schema, iter, entry,
0581:                                    task, request, tm);
0582:                        } else {
0583:                            if (request instanceof  StoredProcedure) {
0584:                                StoredProcedure sp = (StoredProcedure) request;
0585:                                DatabaseProcedureSemantic semantic = sp
0586:                                        .getSemantic();
0587:                                if (semantic != null) {
0588:                                    // Try to optimize the stored procedure execution based on its
0589:                                    // semantic information
0590:                                    if (semantic.isCommutative()
0591:                                            || semantic.isReadOnly()
0592:                                            || (request
0593:                                                    .getWriteLockedDatabaseTables() == null))
0594:                                        moveToNonConflictingQueue(iter, entry);
0595:                                    else
0596:                                        moveMultipleWriteLocksQuery(schema,
0597:                                                iter, entry, task, request, tm);
0598:                                    continue;
0599:                                }
0600:                            }
0601:
0602:                            // Stored procedure or unknown query, take the global lock and proceed
0603:                            // if all other locks are free.
0604:
0605:                            globalLock.acquire(request);
0606:                            if (tm != null) {
0607:                                List acquiredLocks = tm
0608:                                        .getAcquiredLocks(backend);
0609:                                if ((acquiredLocks == null)
0610:                                        || !acquiredLocks.contains(globalLock))
0611:                                    tm.addAcquiredLock(backend, globalLock);
0612:                            } else {
0613:                                ArrayList globalLockList = new ArrayList();
0614:                                globalLockList.add(globalLock);
0615:                                task.setLocks(backend, globalLockList);
0616:                            }
0617:
0618:                            // Just wait for current transactions to complete if all locks are not
0619:                            // free.
0620:                            if (!schema
0621:                                    .allTablesAreUnlockedOrLockedByTransaction(request))
0622:                                return;
0623:
0624:                            // Clear to go, all locks are free. Acquire the global lock and move
0625:                            // to the non-conflicting queue.
0626:                            moveToNonConflictingQueue(iter, entry);
0627:                            continue;
0628:                        }
0629:                    }
0630:                }
0631:            }
0632:
0633:            private void moveMultipleWriteLocksQuery(DatabaseSchema schema,
0634:                    Iterator iter, BackendTaskQueueEntry entry,
0635:                    AbstractTask task, AbstractRequest request,
0636:                    TransactionMetaData tm) {
0637:                /*
0638:                 * Assume that we will get all locks and that we will execute in the
0639:                 * non-conflicting queue. If there is any issue, the queue will be set to
0640:                 * conflicting queue.
0641:                 */
0642:                boolean allLocksAcquired = true;
0643:                for (Iterator lockIter = request.getWriteLockedDatabaseTables()
0644:                        .iterator(); lockIter.hasNext();) {
0645:                    String tableName = (String) lockIter.next();
0646:                    DatabaseTable table = schema.getTable(tableName, false);
0647:                    if (table == null) { // No table found, let's go for the conflicting queue
0648:                        logger.warn("Unable to find table "
0649:                                + tableName
0650:                                + " in database schema, scheduling query "
0651:                                + request.toStringShortForm(requestManager
0652:                                        .getVirtualDatabase()
0653:                                        .getSqlShortFormLength())
0654:                                + " in conflicting queue.");
0655:                        allLocksAcquired = false;
0656:                    } else { /*
0657:                     * If we get the lock we go in the non conflicting queue else we go in
0658:                     * the conflicting queue
0659:                     */
0660:                        TransactionLogicalLock tableLock = table.getLock();
0661:                        if (!tableLock.acquire(request))
0662:                            allLocksAcquired = false;
0663:                        /*
0664:                         * Make sure that the lock is added only once to the list especially if
0665:                         * multiple backends execute this piece of code when checking for
0666:                         * priority inversion in their own queue (if the lock was already
0667:                         * acquired, tableLock.acquire() returns directly true)
0668:                         */
0669:                        if (tm != null) {
0670:                            List acquiredLocks = tm.getAcquiredLocks(backend);
0671:                            if ((acquiredLocks == null)
0672:                                    || !acquiredLocks.contains(tableLock))
0673:                                tm.addAcquiredLock(backend, tableLock);
0674:                        } else {
0675:                            List tableLockList = task.getLocks(backend);
0676:                            if (tableLockList == null)
0677:                                tableLockList = new ArrayList();
0678:                            // There is no need to synchronize on task.getLocks because we are in
0679:                            // mutual exclusion here in a synchronized block on synchronized
0680:                            // (atomicPostSyncObject) that has been taken by the caller of this
0681:                            // method.
0682:                            if (!tableLockList.contains(tableLock)) {
0683:                                tableLockList.add(tableLock);
0684:                                task.setLocks(backend, tableLockList);
0685:                            }
0686:                        }
0687:                    }
0688:                }
0689:                // if we acquired all locks, we can go to the non conflicting queue
0690:                if (allLocksAcquired)
0691:                    moveToNonConflictingQueue(iter, entry);
0692:                else
0693:                    moveToConflictingQueue(iter, entry);
0694:            }
0695:
0696:            private void moveToConflictingQueue(Iterator iter,
0697:                    BackendTaskQueueEntry entry) {
0698:                iter.remove();
0699:                if (logger.isDebugEnabled())
0700:                    logger.debug("Moving " + entry.getTask()
0701:                            + " to conflicting queue");
0702:                synchronized (conflictingRequestsQueue) {
0703:                    entry.setQueue(conflictingRequestsQueue);
0704:                    conflictingRequestsQueue.addLast(entry);
0705:                }
0706:            }
0707:
0708:            private void moveToNonConflictingQueue(Iterator iter,
0709:                    BackendTaskQueueEntry entry) {
0710:                iter.remove();
0711:                if (logger.isDebugEnabled())
0712:                    logger.debug("Moving " + entry.getTask()
0713:                            + " to non conflicting queue");
0714:                synchronized (nonConflictingRequestsQueue) {
0715:                    entry.setQueue(nonConflictingRequestsQueue);
0716:                    nonConflictingRequestsQueue.addLast(entry);
0717:                }
0718:            }
0719:
0720:            private static final int UNASSIGNED_QUEUE = -1;
0721:            private static final int CONFLICTING_QUEUE = 0;
0722:            private static final int NON_CONFLICTING_QUEUE = 1;
0723:            private static final int STORED_PROCEDURE_QUEUE = 2;
0724:            private final Object atomicPostSyncObject = new Object();
0725:
0726:            /**
0727:             * Lock list variable is set by getQueueAndWriteLockTables and retrieved by
0728:             * atomicTaskPostInQueueAndReleaseLock. This is safe since this happens in the
0729:             * synchronized (ATOMIC_POST_SYNC_OBJECT) block.
0730:             */
0731:            private ArrayList lockList = null;
0732:
0733:            /**
0734:             * Fetch the next task from the backend total order queue and post it to one
0735:             * of the queues (conflicting or not).
0736:             * <p>
0737:             * Note that this method must be called within a synchronized block on this.
0738:             *
0739:             * @return true if an entry was processed, false if there is the total order
0740:             *         queue is empty.
0741:             */
0742:            private boolean fetchNextQueryFromBackendTotalOrderQueue() {
0743:                DatabaseSchema schema = backend.getDatabaseSchema();
0744:                TransactionMetaData tm = null;
0745:                int queueToUse = UNASSIGNED_QUEUE;
0746:
0747:                AbstractTask task;
0748:                AbstractRequest request;
0749:
0750:                // Fetch first task from queue
0751:                synchronized (totalOrderQueue) {
0752:                    if (totalOrderQueue.isEmpty())
0753:                        return false;
0754:                    task = (AbstractTask) totalOrderQueue.removeFirst();
0755:
0756:                    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) { /*
0757:                     * If asynchronous execution is allowed, we have to ensure that queries
0758:                     * of the same transaction are executed in order that is only one at a
0759:                     * time. We also have to ensure that late queries execute before new
0760:                     * queries accessing the same resources.
0761:                     */
0762:
0763:                        /*
0764:                         * SYNCHRONIZATION: this check has to be performed in a synchronized
0765:                         * block to avoid race conditions with terminating taks that perform
0766:                         * priority inversions. Such operations move tasks across the backend
0767:                         * queues and may invalidate the check.
0768:                         */
0769:                        synchronized (atomicPostSyncObject) {
0770:                            while (mustWaitForLateTask(task)) {
0771:                                totalOrderQueue.addFirst(task); // Put back request in queue
0772:                                // Behave as an empty queue, we will be notified when the blocking
0773:                                // query has completed
0774:                                return false;
0775:                            }
0776:                        }
0777:                    }
0778:
0779:                    // Now process the task
0780:                    request = task.getRequest();
0781:                    if (request == null || task instanceof  BeginTask) {
0782:                        addTaskInNonConflictingRequestsQueue(task, !task
0783:                                .isAutoCommit());
0784:                        return true;
0785:                    } else { // Parse the request if needed, should only happen at recover time
0786:                        try {
0787:                            if (!request.isParsed())
0788:                                request.parse(backend.getDatabaseSchema(),
0789:                                        ParsingGranularities.TABLE, false);
0790:                        } catch (SQLException e) {
0791:                            logger.warn("Parsing of request " + request
0792:                                    + " failed in recovery process", e);
0793:                        }
0794:                    }
0795:                    if (backend.isReplaying()) {
0796:                        /*
0797:                         * Read-only stored procedures can get logged when the schema is
0798:                         * unavailable, because no backends are enabled. They do not need to be
0799:                         * replayed and they may slow down recovery significantly.
0800:                         */
0801:                        if (request instanceof  StoredProcedure) {
0802:                            StoredProcedure sp = (StoredProcedure) request;
0803:                            DatabaseProcedureSemantic semantic = sp
0804:                                    .getSemantic();
0805:                            if (semantic != null && semantic.isReadOnly()) {
0806:                                task.notifySuccess(null);
0807:                                synchronized (this ) {// The RecoverThread may be waiting to add a request
0808:                                    notifyAll();
0809:                                }
0810:                                return true;
0811:                            }
0812:                        }
0813:                    }
0814:                    if (!request.isAutoCommit()) { // Retrieve the transaction marker metadata
0815:                        try {
0816:                            tm = requestManager
0817:                                    .getTransactionMetaData(new Long(request
0818:                                            .getTransactionId()));
0819:                        } catch (SQLException e) {
0820:                            // We didn't start or lazy start the transaction
0821:                            if (logger.isDebugEnabled())
0822:                                logger
0823:                                        .debug("No transaction medatada found for transaction "
0824:                                                + request.getTransactionId());
0825:                        }
0826:                    }
0827:
0828:                    if (schema == null) {
0829:                        try {
0830:                            task.notifyFailure((BackendWorkerThread) Thread
0831:                                    .currentThread(), 0, new SQLException(
0832:                                    "No schema available to perform request locking on backend "
0833:                                            + backend.getName()));
0834:                        } catch (SQLException ignore) {
0835:                            // Wait interrupted in notifyFailure
0836:                        }
0837:                        return true;
0838:                    }
0839:
0840:                    synchronized (atomicPostSyncObject) {
0841:                        lockList = null;
0842:
0843:                        boolean requestIsAStoredProcedure = request instanceof  StoredProcedure;
0844:                        if (requestIsAStoredProcedure)
0845:                            storedProcedureInQueue++;
0846:
0847:                        SortedSet writeLockedTables = request
0848:                                .getWriteLockedDatabaseTables();
0849:                        if ((writeLockedTables != null)
0850:                                && (writeLockedTables.size() > 1))
0851:                            writesWithMultipleLocks++;
0852:
0853:                        // Check if a stored procedure is locking the database
0854:                        TransactionLogicalLock globalLock = schema.getLock();
0855:                        if (globalLock.isLocked()) {
0856:                            if (request.isAutoCommit()) {
0857:                                queueToUse = STORED_PROCEDURE_QUEUE;
0858:                            } else {
0859:                                /*
0860:                                 * If we are the transaction executing the stored procedure, then we
0861:                                 * can proceed in the conflicting queue.
0862:                                 */
0863:                                if (globalLock.getLocker() == request
0864:                                        .getTransactionId())
0865:                                    queueToUse = NON_CONFLICTING_QUEUE;
0866:                                else {
0867:                                    /*
0868:                                     * If we are one of the transactions that already has acquired
0869:                                     * locks then we should try to complete our transaction else we
0870:                                     * stack in the stored procedure queue.
0871:                                     */
0872:                                    if ((tm == null)
0873:                                            || (tm.getAcquiredLocks(backend) == null)) {
0874:                                        // No locks taken so far, or transaction not [lazy] started =>
0875:                                        // go in the stored procedure queue
0876:                                        queueToUse = STORED_PROCEDURE_QUEUE;
0877:                                    }
0878:                                }
0879:                            }
0880:                        }
0881:
0882:                        if (queueToUse == UNASSIGNED_QUEUE) { // No stored procedure or transaction that started before the stored
0883:                            // procedure was posted
0884:                            if (request instanceof  AbstractWriteRequest
0885:                                    && !((AbstractWriteRequest) request)
0886:                                            .requiresGlobalLock()) {
0887:                                try {
0888:                                    queueToUse = getQueueAndWriteLockTables(
0889:                                            request, schema, tm);
0890:                                } catch (SQLException e) {
0891:                                    try {
0892:                                        task.notifyFailure(
0893:                                                (BackendWorkerThread) Thread
0894:                                                        .currentThread(), 0, e);
0895:                                    } catch (SQLException ignore) {
0896:                                        // Wait interrupted in notifyFailure
0897:                                    }
0898:                                    return true;
0899:                                }
0900:                            } else if (request instanceof  SelectRequest) {
0901:                                /*
0902:                                 * Note that SelectRequest scheduling is a little bit tricky to
0903:                                 * understand. Basically, we should just allow one select request at
0904:                                 * a time. If they are in different transactions, this is fine they
0905:                                 * will be properly isolated by the underlying database and queries
0906:                                 * from the same transaction are guaranteed to be executed in order
0907:                                 * (therefore they will go to the non-conflicting queue since their
0908:                                 * write lock set is null). If SELECT is in autocommit, we ensure
0909:                                 * that only one autocommit request is executed at a time, so
0910:                                 * finally we are safe in all cases. SELECT...FOR UPDATE are treated
0911:                                 * as writes since their write lock tables is set accordingly.
0912:                                 */
0913:                                try {
0914:                                    queueToUse = getQueueAndWriteLockTables(
0915:                                            request, schema, tm);
0916:                                } catch (SQLException e) {
0917:                                    try {
0918:                                        task.notifyFailure(
0919:                                                (BackendWorkerThread) Thread
0920:                                                        .currentThread(), 0, e);
0921:                                    } catch (SQLException ignore) {
0922:                                        // Wait interrupted in notifyFailure
0923:                                    }
0924:                                    return true;
0925:                                }
0926:                            } else {
0927:                                if (requestIsAStoredProcedure) {
0928:                                    StoredProcedure sp = (StoredProcedure) request;
0929:                                    DatabaseProcedureSemantic semantic = sp
0930:                                            .getSemantic();
0931:                                    if (semantic != null) {
0932:                                        // Try to optimize the stored procedure execution based on its
0933:                                        // semantic information
0934:                                        if (semantic.isReadOnly()
0935:                                                || (request
0936:                                                        .getWriteLockedDatabaseTables() == null))
0937:                                            queueToUse = NON_CONFLICTING_QUEUE;
0938:                                        else {
0939:                                            try {
0940:                                                queueToUse = getQueueAndWriteLockTables(
0941:                                                        request, schema, tm);
0942:                                            } catch (SQLException e) {
0943:                                                try {
0944:                                                    task
0945:                                                            .notifyFailure(
0946:                                                                    (BackendWorkerThread) Thread
0947:                                                                            .currentThread(),
0948:                                                                    0, e);
0949:                                                } catch (SQLException ignore) {
0950:                                                    // Wait interrupted in notifyFailure
0951:                                                }
0952:                                                return true;
0953:                                            }
0954:                                            if (semantic.isCommutative())
0955:                                                queueToUse = NON_CONFLICTING_QUEUE;
0956:                                        }
0957:                                    }
0958:                                }
0959:
0960:                                if (queueToUse == UNASSIGNED_QUEUE) {
0961:                                    /*
0962:                                     * Stored procedure or unknown query, let's assume it blocks the
0963:                                     * whole database. Check if we can lock everything else we wait
0964:                                     * for all locks to be free.
0965:                                     */
0966:                                    if (!globalLock.isLocked()) { // Lock the whole database so that we can execute when all
0967:                                        // locks are released
0968:                                        globalLock.acquire(request);
0969:                                        if (tm != null)
0970:                                            tm.addAcquiredLock(backend,
0971:                                                    globalLock);
0972:                                        else {
0973:                                            if (lockList == null)
0974:                                                lockList = new ArrayList();
0975:                                            lockList.add(globalLock);
0976:                                        }
0977:                                        if (schema
0978:                                                .allTablesAreUnlockedOrLockedByTransaction(request))
0979:                                            // Clear to go, all locks are free
0980:                                            queueToUse = NON_CONFLICTING_QUEUE;
0981:                                        else
0982:                                            // We will have to wait for everyone to release its locks
0983:                                            queueToUse = STORED_PROCEDURE_QUEUE;
0984:                                    } else { /*
0985:                                     * A stored procedure is holding the lock but we are in a
0986:                                     * transaction that already acquired locks so we are authorized
0987:                                     * to complete.
0988:                                     */
0989:                                        if (schema
0990:                                                .allTablesAreUnlockedOrLockedByTransaction(request)) {
0991:                                            queueToUse = NON_CONFLICTING_QUEUE;
0992:                                            List locks = schema
0993:                                                    .lockAllTables(request);
0994:                                            if (tm != null)
0995:                                                tm.addAcquiredLocks(backend,
0996:                                                        locks);
0997:                                            else {
0998:                                                if (lockList == null)
0999:                                                    lockList = new ArrayList();
1000:                                                lockList.add(locks);
1001:                                            }
1002:                                        } else { /*
1003:                                         * We will have to wait for the completion of the transaction
1004:                                         * of the stored procedure currently holding the global lock.
1005:                                         */
1006:                                            queueToUse = STORED_PROCEDURE_QUEUE;
1007:                                        }
1008:                                    }
1009:                                }
1010:                            }
1011:                        }
1012:
1013:                        if (queueToUse == NON_CONFLICTING_QUEUE) {
1014:                            if (logger.isDebugEnabled())
1015:                                logger.debug("Scheduling request " + request
1016:                                        + " in non conflicting queue");
1017:                            addTaskInNonConflictingRequestsQueue(task, false);
1018:                        } else if (queueToUse == CONFLICTING_QUEUE) {
1019:                            if (logger.isDebugEnabled())
1020:                                logger.debug("Scheduling request " + request
1021:                                        + " in conflicting queue");
1022:                            addTaskInConflictingRequestsQueue(task);
1023:                        } else if (queueToUse == STORED_PROCEDURE_QUEUE) {
1024:                            if (logger.isDebugEnabled())
1025:                                logger.debug("Scheduling request " + request
1026:                                        + " in stored procedure queue");
1027:                            addTaskInStoredProcedureQueue(task);
1028:                        }
1029:
1030:                        task.setLocks(backend, lockList);
1031:                    } // synchronized (atomicPostSyncObject)
1032:                } // synchronized (totalOrderQueue)
1033:
1034:                return true;
1035:            }
1036:
1037:            /**
1038:             * Schedule a query that takes write on multiple tables and tell in which
1039:             * queue the task should be posted. This updates the conflictingTable above if
1040:             * the query must be posted in the CONFLICTING_QUEUE and this always update
1041:             * the list of locks taken by this request (either directly in tm lock list if
1042:             * tm is not null, or by updating lockList defined above)
1043:             *
1044:             * @param request the request to schedule
1045:             * @param schema the current database schema containing lock information
1046:             * @param tm the transaction marker metadata (null if request is autocommit)
1047:             * @return the queue to use (NON_CONFLICTING_QUEUE or CONFLICTING_QUEUE)
1048:             * @throws SQLException if a table is not found in the schema and
1049:             *           enforceTableExistenceIntoSchema is set to true for the VDB
1050:             */
1051:            private int getQueueAndWriteLockTables(AbstractRequest request,
1052:                    DatabaseSchema schema, TransactionMetaData tm)
1053:                    throws SQLException {
1054:                SortedSet writeLockedTables = request
1055:                        .getWriteLockedDatabaseTables();
1056:
1057:                if (writeLockedTables == null || writeLockedTables.isEmpty()) { // This request does not lock anything
1058:                    return NON_CONFLICTING_QUEUE;
1059:                } else if (request.isCreate() && writeLockedTables.size() == 1) { // This request does not lock anything
1060:                    // create table : we do not need to execute
1061:                    // in conflicting queue, but we have to lock the table for recovery
1062:                    // operations (that are done in a parallel way)
1063:                    return NON_CONFLICTING_QUEUE;
1064:                }
1065:
1066:                /*
1067:                 * Assume that we will get all locks and that we will execute in the
1068:                 * non-conflicting queue. If there is any issue, the queue will be set to
1069:                 * conflicting queue.
1070:                 */
1071:                int queueToUse = NON_CONFLICTING_QUEUE;
1072:                for (Iterator iter = writeLockedTables.iterator(); iter
1073:                        .hasNext();) {
1074:                    String tableName = (String) iter.next();
1075:                    DatabaseTable table = schema.getTable(tableName, false);
1076:                    if (table == null) { // table not found in the database schema.
1077:                        if (request.isCreate()
1078:                                && tableName.equals(((CreateRequest) request)
1079:                                        .getTableName())) {
1080:                            // We are trying to create a table, so it can obiously not be found in
1081:                            // the database schema. Let's go for the conflicting queue.
1082:                            logger.warn("Creating table "
1083:                                    + tableName
1084:                                    + ", scheduling query "
1085:                                    + request.toStringShortForm(requestManager
1086:                                            .getVirtualDatabase()
1087:                                            .getSqlShortFormLength())
1088:                                    + " in conflicting queue.");
1089:                            queueToUse = CONFLICTING_QUEUE;
1090:                            continue;
1091:                        }
1092:
1093:                        // Check if it is a session-dependant temporary table that could not be
1094:                        // found in the database schema
1095:                        if (!request.isAutoCommit()) {
1096:                            PooledConnection pc = backend.getConnectionManager(
1097:                                    request.getLogin())
1098:                                    .retrieveConnectionForTransaction(
1099:                                            request.getTransactionId());
1100:                            if (pc != null
1101:                                    && pc.existsTemporaryTable(tableName))
1102:                                continue;
1103:                        } else if (request.isPersistentConnection()) {
1104:                            try {
1105:                                PooledConnection pc = backend
1106:                                        .getConnectionManager(
1107:                                                request.getLogin())
1108:                                        .retrieveConnectionInAutoCommit(request);
1109:                                if (pc != null
1110:                                        && pc.existsTemporaryTable(tableName))
1111:                                    continue;
1112:                            } catch (UnreachableBackendException e) {
1113:                            }
1114:                        }
1115:
1116:                        // At this point, we did not find the table either in the database
1117:                        // schema, nor inside the connection's context.
1118:                        if (!request.tableExistenceCheckIsDisabled()
1119:                                && requestManager.getVirtualDatabase()
1120:                                        .enforceTableExistenceIntoSchema()) {
1121:                            String errMsg = "Unable to find table "
1122:                                    + tableName
1123:                                    + " in database schema, rejecting query "
1124:                                    + request.toStringShortForm(requestManager
1125:                                            .getVirtualDatabase()
1126:                                            .getSqlShortFormLength()) + ".";
1127:                            logger.warn(errMsg);
1128:                            throw new SQLException(errMsg);
1129:                        } else {
1130:                            // No table found, let's go for the conflicting queue
1131:                            logger.warn("Unable to find table "
1132:                                    + tableName
1133:                                    + " in database schema, scheduling query "
1134:                                    + request.toStringShortForm(requestManager
1135:                                            .getVirtualDatabase()
1136:                                            .getSqlShortFormLength())
1137:                                    + " in conflicting queue.");
1138:                            queueToUse = CONFLICTING_QUEUE;
1139:                        }
1140:                    } else { /*
1141:                     * If we get the lock we go in the non conflicting queue else we go in
1142:                     * the conflicting queue
1143:                     */
1144:                        if (!table.getLock().acquire(request)) {
1145:                            queueToUse = CONFLICTING_QUEUE;
1146:                            if (logger.isDebugEnabled())
1147:                                logger.debug("Request " + request
1148:                                        + " waits for lock on table " + table);
1149:                        }
1150:                        if (tm != null)
1151:                            tm.addAcquiredLock(backend, table.getLock());
1152:                        else {
1153:                            if (lockList == null)
1154:                                lockList = new ArrayList();
1155:                            lockList.add(table.getLock());
1156:                        }
1157:                    }
1158:                }
1159:                return queueToUse;
1160:            }
1161:
1162:            /**
1163:             * Release the locks acquired by a request executed in autocommit mode.
1164:             *
1165:             * @param locks the list of locks acquired by the request
1166:             * @param transactionId the "fake" transaction id assign to the autocommit
1167:             *          request releasing the locks
1168:             */
1169:            private void releaseLocksForAutoCommitRequest(List locks,
1170:                    long transactionId) {
1171:                if (locks == null)
1172:                    return; // No locks acquired
1173:                for (Iterator iter = locks.iterator(); iter.hasNext();) {
1174:                    TransactionLogicalLock lock = (TransactionLogicalLock) iter
1175:                            .next();
1176:                    if (lock == null)
1177:                        logger.warn("Unexpected null lock for transaction "
1178:                                + transactionId + " when releasing "
1179:                                + locks.toArray());
1180:                    else
1181:                        lock.release(transactionId);
1182:                }
1183:            }
1184:
1185:            /**
1186:             * Release the locks held by the given transaction at commit/rollback time.
1187:             *
1188:             * @param transactionId the transaction releasing the locks
1189:             */
1190:            private void releaseLocksForTransaction(long transactionId) {
1191:                try {
1192:                    TransactionMetaData tm = requestManager
1193:                            .getTransactionMetaData(new Long(transactionId));
1194:                    releaseLocksForAutoCommitRequest(tm
1195:                            .removeBackendLocks(backend), transactionId);
1196:                } catch (SQLException e) {
1197:                    /*
1198:                     * this is expected to fail when replaying the recovery log, since the
1199:                     * request manager won't have any transaction metadatas for transactions
1200:                     * we are replaying => we don't log warnings in this case.
1201:                     */
1202:                    if (!backend.isReplaying())
1203:                        if (logger.isWarnEnabled())
1204:                            logger
1205:                                    .warn("No transaction medatada found for transaction "
1206:                                            + transactionId
1207:                                            + " releasing locks manually");
1208:                    if (backend.getDatabaseSchema() != null)
1209:                        backend.getDatabaseSchema().releaseLocksOnAllTables(
1210:                                transactionId);
1211:                    else {
1212:                        /*
1213:                         * At this point, schema can be null, for example if the backend is down
1214:                         */
1215:                        if (logger.isWarnEnabled())
1216:                            logger
1217:                                    .warn("Cannot release locks, as no schema is available on this backend. "
1218:                                            + "This backend is problably not available anymore.");
1219:                    }
1220:                }
1221:            }
1222:
1223:            private TransactionMetaData getTransactionMetaData(
1224:                    AbstractRequest request) {
1225:                TransactionMetaData tm = null;
1226:                if ((request != null) && !request.isAutoCommit()) { // Retrieve the transaction marker metadata
1227:                    try {
1228:                        tm = requestManager.getTransactionMetaData(new Long(
1229:                                request.getTransactionId()));
1230:                    } catch (SQLException e) {
1231:                        // We didn't start or lazy start the transaction
1232:                        if (logger.isDebugEnabled())
1233:                            logger
1234:                                    .debug("No transaction medatada found for transaction "
1235:                                            + request.getTransactionId());
1236:                    }
1237:                }
1238:                return tm;
1239:            }
1240:
1241:            /**
1242:             * Notify the completion of the given entry. The corresponding task completion
1243:             * is notified to the backend.
1244:             *
1245:             * @param entry the executed entry
1246:             */
1247:            public final void completedEntryExecution(
1248:                    BackendTaskQueueEntry entry) {
1249:                completedEntryExecution(entry, null);
1250:            }
1251:
1252:            /**
1253:             * Perform the cleanup to release locks and priority inversion checkings after
1254:             * a stored procedure execution
1255:             *
1256:             * @param task the task that completed
1257:             */
1258:            public void completeStoredProcedureExecution(AbstractTask task) {
1259:                AbstractRequest request = task.getRequest();
1260:                long transactionId = request.getTransactionId();
1261:                synchronized (atomicPostSyncObject) {
1262:                    if (request.isAutoCommit()) {
1263:                        releaseLocksForAutoCommitRequest(
1264:                                task.getLocks(backend), transactionId);
1265:                        checkForPriorityInversion();
1266:                        SortedSet writeLockedTables = request
1267:                                .getWriteLockedDatabaseTables();
1268:                        if ((writeLockedTables != null)
1269:                                && (writeLockedTables.size() > 1))
1270:                            writesWithMultipleLocks--;
1271:                    }
1272:                    storedProcedureInQueue--;
1273:                }
1274:            }
1275:
1276:            /**
1277:             * Perform the cleanup to release locks and priority inversion checkings after
1278:             * a write query execution
1279:             *
1280:             * @param task the task that completed
1281:             */
1282:            public void completeWriteRequestExecution(AbstractTask task) {
1283:                AbstractRequest request = task.getRequest();
1284:                SortedSet writeLockedTables = request
1285:                        .getWriteLockedDatabaseTables();
1286:                if ((writeLockedTables != null)
1287:                        && (writeLockedTables.size() > 1))
1288:                    synchronized (atomicPostSyncObject) {
1289:                        writesWithMultipleLocks--;
1290:                    }
1291:
1292:                long transactionId = request.getTransactionId();
1293:                if (request.isAutoCommit()) {
1294:                    synchronized (atomicPostSyncObject) {
1295:                        releaseLocksForAutoCommitRequest(
1296:                                task.getLocks(backend), transactionId);
1297:                        // Make sure we release the requests locking multiple tables or the
1298:                        // stored procedures that are blocked if any
1299:                        if (writesWithMultipleLocks > 0
1300:                                || waitForCompletionPolicy
1301:                                        .isEnforceTableLocking())
1302:                            checkForPriorityInversion();
1303:                        else if (storedProcedureInQueue > 0)
1304:                            checkForPriorityInversion();
1305:                    }
1306:                }
1307:            }
1308:
1309:            /**
1310:             * Releasing locks and checking for priority inversion. Usually used at commit
1311:             * or rollback completion time.
1312:             *
1313:             * @param tm the transaction metadata
1314:             */
1315:            public void releaseLocksAndCheckForPriorityInversion(
1316:                    TransactionMetaData tm) {
1317:                synchronized (atomicPostSyncObject) {
1318:                    releaseLocksForTransaction(tm.getTransactionId());
1319:                    checkForPriorityInversion();
1320:                }
1321:            }
1322:
1323:            /**
1324:             * Removes the specified entry from its queue and notifies threads waiting on
1325:             * this backend task queue. The removal is performed using the iterator, if
1326:             * specified (non-null), or directly on the queue otherwize.
1327:             *
1328:             * @param entry the entry to remove from its queue
1329:             * @param iter the iterator on which to call remove(), or null if not
1330:             *          applicable.
1331:             */
1332:            private void completedEntryExecution(BackendTaskQueueEntry entry,
1333:                    Iterator iter) {
1334:                if (entry == null)
1335:                    return;
1336:
1337:                // Notify the backend that this query execution is complete
1338:                AbstractTask task = entry.getTask();
1339:                if (!backend.removePendingTask(task))
1340:                    logger.warn("Unable to remove task " + task
1341:                            + " from pending request queue");
1342:
1343:                synchronized (this ) {
1344:                    // Remove the entry from its queue
1345:                    LinkedList queue = entry.getQueue();
1346:                    synchronized (queue) {
1347:                        if (iter != null)
1348:                            iter.remove();
1349:                        else {
1350:                            if (!queue.remove(entry))
1351:                                logger.error("Failed to remove task " + task
1352:                                        + " from " + queue);
1353:                        }
1354:                    }
1355:
1356:                    // Notify the queues to unblock queries waiting in getNextEntryToExecute
1357:                    // for the completion of the current request.
1358:                    this .notifyAll();
1359:                }
1360:            }
1361:
1362:            /**
1363:             * Return the first entry in the conflicting requests queue (does not remove
1364:             * it from the list).
1365:             *
1366:             * @return the first entry in the conflicting queue
1367:             */
1368:            public final BackendTaskQueueEntry getFirstConflictingRequestQueueOrStoredProcedureQueueEntry() {
1369:                synchronized (conflictingRequestsQueue) {
1370:                    if (conflictingRequestsQueue.isEmpty()) {
1371:                        synchronized (storedProcedureQueue) {
1372:                            if (storedProcedureQueue.isEmpty())
1373:                                return null;
1374:                            return (BackendTaskQueueEntry) storedProcedureQueue
1375:                                    .getFirst();
1376:                        }
1377:                    }
1378:                    return (BackendTaskQueueEntry) conflictingRequestsQueue
1379:                            .getFirst();
1380:                }
1381:            }
1382:
1383:            /**
1384:             * Returns the stored procedure queue. This is needed for deadlock detection
1385:             * but clearly does break the abstarction layer as it exposes a private field
1386:             * in an un-controlled way.
1387:             *
1388:             * @return the stored procedure queue.
1389:             */
1390:            public List getStoredProcedureQueue() {
1391:                return storedProcedureQueue;
1392:            }
1393:
1394:            /**
1395:             * Get the next available task entry to process from the queues. If the
1396:             * backend is killed, this method will return a KillThreadTask else it will
1397:             * wait for a task to be ready to be executed. Note that the task is left in
1398:             * the queue and flagged as processed by the thread given as a parameter. The
1399:             * task will only be removed from the queue when the thread notifies the
1400:             * completion of the task.
1401:             *
1402:             * @param thread the thread that will execute the task
1403:             * @return the task to execute
1404:             */
1405:            public final BackendTaskQueueEntry getNextEntryToExecute(
1406:                    BackendWorkerThread thread) {
1407:                BackendTaskQueueEntry entry = null;
1408:
1409:                /*
1410:                 * The strategy is to look first for the non-conflicting queue so that
1411:                 * non-conflicting transactions could progress as fast as possible. Then we
1412:                 * check the conflicting queue if we did not find a task to execute.<p> If
1413:                 * we failed to find something to execute in the active queues, we process
1414:                 * everything available in the total order queue to push the tasks in the
1415:                 * active queues.
1416:                 */
1417:
1418:                while (true) {
1419:                    Object firstNonConflictingTask = null;
1420:                    Object lastNonConflictingTask = null;
1421:                    // Check the non-conflicting queue first
1422:                    synchronized (nonConflictingRequestsQueue) {
1423:                        if (!nonConflictingRequestsQueue.isEmpty()) {
1424:                            firstNonConflictingTask = nonConflictingRequestsQueue
1425:                                    .getFirst();
1426:                            lastNonConflictingTask = nonConflictingRequestsQueue
1427:                                    .getLast();
1428:                            for (Iterator iter = nonConflictingRequestsQueue
1429:                                    .iterator(); iter.hasNext();) {
1430:                                entry = (BackendTaskQueueEntry) iter.next();
1431:                                if (entry.getProcessingThread() == null) { // This task is not currently processed, let's execute it
1432:                                    entry.setProcessingThread(thread);
1433:                                    return entry;
1434:                                }
1435:                            }
1436:                        }
1437:                    }
1438:
1439:                    // Nothing to be executed now in the non-conflicting queue, check the
1440:                    // conflicting queue
1441:                    Object firstConflictingTask = null;
1442:                    Object lastConflictingTask = null;
1443:                    synchronized (conflictingRequestsQueue) {
1444:                        if (!conflictingRequestsQueue.isEmpty()) {
1445:                            firstConflictingTask = conflictingRequestsQueue
1446:                                    .getFirst();
1447:                            lastConflictingTask = conflictingRequestsQueue
1448:                                    .getLast();
1449:                            // Only check the first task since we must execute them only one at a
1450:                            // time.
1451:                            entry = (BackendTaskQueueEntry) conflictingRequestsQueue
1452:                                    .getFirst();
1453:                            if (entry.getProcessingThread() == null) { // The task is not currently processed.
1454:                                AbstractRequest request = entry.getTask()
1455:                                        .getRequest();
1456:                                SortedSet lockedTables = request
1457:                                        .getWriteLockedDatabaseTables();
1458:                                if ((lockedTables != null)
1459:                                        && (lockedTables.size() > 0)) {
1460:                                    /**
1461:                                     * Check if there are requests in the non-conflicting queue that
1462:                                     * belongs to a transaction that is holding a lock on which we
1463:                                     * conflict.
1464:                                     * <p>
1465:                                     * Note that if we need to lock multiple tables and that we are in
1466:                                     * the conflicting queue, we are going to wait until all locks are
1467:                                     * free or a deadlock detection occurs.
1468:                                     */
1469:                                    boolean conflictingQueryDetected = false;
1470:                                    synchronized (nonConflictingRequestsQueue) {
1471:                                        if (!nonConflictingRequestsQueue
1472:                                                .isEmpty()
1473:                                                || waitForCompletionPolicy
1474:                                                        .isEnforceTableLocking()) { // Check for a potential conflict
1475:                                            int locksNotOwnedByMe = 0;
1476:                                            long transactionId = entry
1477:                                                    .getTask()
1478:                                                    .getTransactionId();
1479:                                            DatabaseSchema schema = backend
1480:                                                    .getDatabaseSchema();
1481:                                            for (Iterator iterator = lockedTables
1482:                                                    .iterator(); iterator
1483:                                                    .hasNext()
1484:                                                    && !conflictingQueryDetected;) {
1485:                                                String tableName = (String) iterator
1486:                                                        .next();
1487:                                                DatabaseTable table = schema
1488:                                                        .getTable(tableName,
1489:                                                                false);
1490:                                                if (table == null) { // No table found, let's go for the conflicting queue
1491:                                                    logger
1492:                                                            .warn("Unable to find table "
1493:                                                                    + tableName
1494:                                                                    + " in database schema, when getting next entry to execute : "
1495:                                                                    + request
1496:                                                                            .toStringShortForm(requestManager
1497:                                                                                    .getVirtualDatabase()
1498:                                                                                    .getSqlShortFormLength()));
1499:
1500:                                                    // Assume conflict since non-conflicting queue is not
1501:                                                    // empty
1502:                                                    conflictingQueryDetected = true;
1503:                                                } else {
1504:                                                    TransactionLogicalLock lock = table
1505:                                                            .getLock();
1506:                                                    if (lock.isLocked()) {
1507:                                                        if (lock.getLocker() != transactionId)
1508:                                                            locksNotOwnedByMe++;
1509:
1510:                                                        /*
1511:                                                         * Check if we find a query in the conflicting queue
1512:                                                         * that owns the lock or waits for the lock we need
1513:                                                         */
1514:                                                        for (Iterator iter = nonConflictingRequestsQueue
1515:                                                                .iterator(); iter
1516:                                                                .hasNext();) {
1517:                                                            BackendTaskQueueEntry nonConflictingEntry = (BackendTaskQueueEntry) iter
1518:                                                                    .next();
1519:                                                            long nonConflictingRequestTransactionId = nonConflictingEntry
1520:                                                                    .getTask()
1521:                                                                    .getTransactionId();
1522:                                                            if ((lock
1523:                                                                    .getLocker() == nonConflictingRequestTransactionId)
1524:                                                                    || lock
1525:                                                                            .isWaiting(nonConflictingRequestTransactionId)) {
1526:                                                                conflictingQueryDetected = true;
1527:                                                                break;
1528:                                                            }
1529:                                                        }
1530:                                                    }
1531:                                                }
1532:                                            }
1533:
1534:                                            /*
1535:                                             * If table level locking is enforced, we don't allow a
1536:                                             * request to execute before it has all its locks
1537:                                             */
1538:                                            if (waitForCompletionPolicy
1539:                                                    .isEnforceTableLocking())
1540:                                                conflictingQueryDetected = locksNotOwnedByMe > 0;
1541:
1542:                                            /*
1543:                                             * If we don't own a single lock (in case of multiple locks)
1544:                                             * needed by this query then we wait for the locks to be
1545:                                             * released or the deadlock detection to abort a transaction
1546:                                             * that is holding at least one of the locks that we need.
1547:                                             */
1548:                                            conflictingQueryDetected = conflictingQueryDetected
1549:                                                    || ((locksNotOwnedByMe > 1) && (locksNotOwnedByMe == lockedTables
1550:                                                            .size()));
1551:                                        }
1552:                                    }
1553:
1554:                                    // If everyone is done in the non-conflicting queue, then
1555:                                    // let's go with this conflicting request
1556:                                    if (!conflictingQueryDetected) {
1557:                                        entry.setProcessingThread(thread);
1558:                                        return entry;
1559:                                    }
1560:                                } else {
1561:                                    if (logger.isWarnEnabled())
1562:                                        logger
1563:                                                .warn("Detected non-locking task "
1564:                                                        + entry.getTask()
1565:                                                        + " in conflicting queue");
1566:
1567:                                    /*
1568:                                     * No clue on where the conflict happens, it might well be that we
1569:                                     * don't access any table but in that case we shouldn't have ended
1570:                                     * up in the conflicting queue. To be safer, let's wait for the
1571:                                     * non-conflicting queue to be empty.
1572:                                     */
1573:                                    synchronized (nonConflictingRequestsQueue) {
1574:                                        if (nonConflictingRequestsQueue
1575:                                                .isEmpty()) {
1576:                                            entry.setProcessingThread(thread);
1577:                                            return entry;
1578:                                        }
1579:                                    }
1580:                                }
1581:                            }
1582:                        }
1583:                    }
1584:
1585:                    synchronized (this ) {
1586:                        // No entry in the queues or all entries are currently processed,
1587:                        // process the total order queue.
1588:                        if (fetchNextQueryFromBackendTotalOrderQueue())
1589:                            continue;
1590:
1591:                        // Nothing in the total order queue either !
1592:                        // Double-check that something was not posted in the queue after we
1593:                        // scanned it
1594:                        synchronized (nonConflictingRequestsQueue) {
1595:                            if (!nonConflictingRequestsQueue.isEmpty()) {
1596:                                if (firstNonConflictingTask != nonConflictingRequestsQueue
1597:                                        .getFirst())
1598:                                    continue;
1599:                                if (lastNonConflictingTask != nonConflictingRequestsQueue
1600:                                        .getLast())
1601:                                    continue;
1602:                            } else if (firstNonConflictingTask != null)
1603:                                continue; // The queue was emptied all at once
1604:                        }
1605:                        synchronized (conflictingRequestsQueue) {
1606:                            if (!conflictingRequestsQueue.isEmpty()) {
1607:                                if (firstConflictingTask != conflictingRequestsQueue
1608:                                        .getFirst())
1609:                                    continue;
1610:                                if (lastConflictingTask != conflictingRequestsQueue
1611:                                        .getLast())
1612:                                    continue;
1613:                            } else if (firstConflictingTask != null)
1614:                                continue; // The queue was emptied all at once
1615:                        }
1616:
1617:                        // Wait until a new task is posted
1618:                        try {
1619:                            this .wait();
1620:                        } catch (InterruptedException ignore) {
1621:                        }
1622:                    }
1623:
1624:                }
1625:            }
1626:
1627:            /**
1628:             * Get the next available commit or rollback task to process from the queues.
1629:             * If the backend is killed, this method will return a KillThreadTask else it
1630:             * will wait for a task to be ready to be executed. Note that the task is left
1631:             * in the queue and flagged as processed by the thread given as a parameter.
1632:             * The task will only be removed from the queue when the thread notifies the
1633:             * completion of the task.
1634:             *
1635:             * @param thread the thread that will execute the task
1636:             * @return the commmit or rollback task to execute
1637:             */
1638:            public BackendTaskQueueEntry getNextCommitRollbackToExecute(
1639:                    BackendWorkerThread thread) {
1640:                boolean found = false;
1641:                BackendTaskQueueEntry entry = null;
1642:                while (!found) {
1643:                    Object firstNonConflictingTask = null;
1644:                    Object lastNonConflictingTask = null;
1645:                    // Check the non-conflicting queue first
1646:                    synchronized (nonConflictingRequestsQueue) {
1647:                        if (!nonConflictingRequestsQueue.isEmpty()) {
1648:                            firstNonConflictingTask = nonConflictingRequestsQueue
1649:                                    .getFirst();
1650:                            lastNonConflictingTask = nonConflictingRequestsQueue
1651:                                    .getLast();
1652:                            for (Iterator iter = nonConflictingRequestsQueue
1653:                                    .iterator(); iter.hasNext();) {
1654:                                entry = (BackendTaskQueueEntry) iter.next();
1655:                                if ((entry.isACommitOrRollback() || (entry
1656:                                        .getTask() instanceof  KillThreadTask))
1657:                                        && (entry.getProcessingThread() == null)) { // This task is not currently processed, let's execute it
1658:                                    entry.setProcessingThread(thread);
1659:                                    return entry;
1660:                                }
1661:                            }
1662:                        }
1663:                    }
1664:
1665:                    synchronized (this ) {
1666:                        // No entry in the queues or all entries are currently processed,
1667:                        // process the total order queue.
1668:                        if (fetchNextQueryFromBackendTotalOrderQueue())
1669:                            continue;
1670:
1671:                        // Double-check that something was not posted in the queue after we
1672:                        // scanned it
1673:                        synchronized (nonConflictingRequestsQueue) {
1674:                            if (!nonConflictingRequestsQueue.isEmpty()) {
1675:                                if (firstNonConflictingTask != nonConflictingRequestsQueue
1676:                                        .getFirst())
1677:                                    continue;
1678:                                if (lastNonConflictingTask != nonConflictingRequestsQueue
1679:                                        .getLast())
1680:                                    continue;
1681:                            }
1682:                        }
1683:
1684:                        try {
1685:                            this .wait();
1686:                        } catch (InterruptedException ignore) {
1687:                        }
1688:                    }
1689:
1690:                }
1691:                // We should never reach this point
1692:                return null;
1693:            }
1694:
1695:            /**
1696:             * Checks if the current entry needs to wait for a later entry before being
1697:             * able to execute.
1698:             *
1699:             * @param currentTask the current <code>AbstractTask</code> candidate for
1700:             *          scheduling
1701:             * @return <code>true</code> if the current task needs to wait for a late
1702:             *         task before being able to execute, <code>false</code> else
1703:             */
1704:            private boolean mustWaitForLateTask(AbstractTask currentTask) {
1705:                if (currentTask.isPersistentConnection()) {
1706:                    long currentCid = currentTask.getPersistentConnectionId();
1707:                    // Check if there are other requests for this transaction in
1708:                    // the queue
1709:                    if (hasTaskForPersistentConnectionInQueue(
1710:                            nonConflictingRequestsQueue, currentCid)
1711:                            || hasTaskForPersistentConnectionInQueue(
1712:                                    conflictingRequestsQueue, currentCid)
1713:                            || hasTaskForPersistentConnectionInQueue(
1714:                                    storedProcedureQueue, currentCid))
1715:                        // Skip this commit/rollback until the conflicting request completes
1716:                        return true;
1717:                }
1718:
1719:                if (!currentTask.isAutoCommit()) {
1720:                    long currentTid = currentTask.getTransactionId();
1721:                    // Check if there are other requests for this transaction in
1722:                    // the queue
1723:                    if (hasTaskForTransactionInQueue(
1724:                            nonConflictingRequestsQueue, currentTid)
1725:                            || hasTaskForTransactionInQueue(
1726:                                    conflictingRequestsQueue, currentTid)
1727:                            || hasTaskForTransactionInQueue(
1728:                                    storedProcedureQueue, currentTid))
1729:                        // Skip this commit/rollback until the conflicting request completes
1730:                        return true;
1731:                }
1732:
1733:                return hasDDLTaskInQueue(nonConflictingRequestsQueue)
1734:                        || hasDDLTaskInQueue(conflictingRequestsQueue)
1735:                        || hasDDLTaskInQueue(storedProcedureQueue);
1736:            }
1737:
1738:            private boolean hasDDLTaskInQueue(List queue) {
1739:                boolean retry;
1740:                do {
1741:                    retry = false;
1742:                    try {
1743:                        for (Iterator iter = queue.iterator(); iter.hasNext();) {
1744:                            BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1745:                                    .next();
1746:                            AbstractTask otherTask = otherEntry.getTask();
1747:                            AbstractRequest request = otherTask.getRequest();
1748:                            /**
1749:                             * For the moment just check if this is a create, drop or alter
1750:                             * statement, we could also check
1751:                             * AbstractRequest#altersDatabaseSchema() but we don't want to block
1752:                             * if this is not a DDL (a stored procedure might alter the schema
1753:                             * because of its default semantic but still might need other queries
1754:                             * to be executed before it can really execute).
1755:                             */
1756:                            if ((request != null)
1757:                                    && (request.isCreate() || request.isAlter() || request
1758:                                            .isDrop())) {
1759:                                return true;
1760:                            }
1761:                        }
1762:                    } catch (ConcurrentModificationException e) {
1763:                        retry = true;
1764:                    }
1765:                } while (retry);
1766:                return false;
1767:            }
1768:
1769:            private boolean hasTaskForPersistentConnectionInQueue(List queue,
1770:                    long cid) {
1771:                boolean retry;
1772:                do {
1773:                    retry = false;
1774:                    try {
1775:                        for (Iterator iter = queue.iterator(); iter.hasNext();) {
1776:                            BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1777:                                    .next();
1778:
1779:                            AbstractTask otherTask = otherEntry.getTask();
1780:
1781:                            // Check if the query is in the same transaction
1782:                            if (otherTask.isPersistentConnection()
1783:                                    && (otherTask.getPersistentConnectionId() == cid)) {
1784:                                return true;
1785:                            }
1786:                        }
1787:                    } catch (ConcurrentModificationException e) {
1788:                        retry = true;
1789:                    }
1790:                } while (retry);
1791:                return false;
1792:            }
1793:
1794:            private boolean hasTaskForTransactionInQueue(List queue, long tid) {
1795:                boolean retry;
1796:                do {
1797:                    retry = false;
1798:                    try {
1799:                        for (Iterator iter = queue.iterator(); iter.hasNext();) {
1800:                            BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1801:                                    .next();
1802:
1803:                            AbstractTask otherTask = otherEntry.getTask();
1804:
1805:                            // Check if the query is in the same transaction
1806:                            if (!otherTask.isAutoCommit()
1807:                                    && (otherTask.getTransactionId() == tid)) {
1808:                                return true;
1809:                            }
1810:                        }
1811:                    } catch (ConcurrentModificationException e) {
1812:                        retry = true;
1813:                    }
1814:                } while (retry);
1815:                return false;
1816:            }
1817:
1818:            /**
1819:             * Return true if tasks are allowed to be posted to the queue. If false, all
1820:             * tasks posted to the queue are systematically notified for completion
1821:             * without being executed (abort behavior)
1822:             *
1823:             * @return Returns the allowTasksToBePosted.
1824:             */
1825:            public boolean allowTasksToBePosted() {
1826:                synchronized (ALLOW_TASKS_SYNC) {
1827:                    return allowTasksToBePosted;
1828:                }
1829:            }
1830:
1831:            /**
1832:             * Set to true if tasks are allowed to be posted to the queue else, all tasks
1833:             * posted to the queue are systematically notified for completion without
1834:             * being executed (abort behavior)
1835:             *
1836:             * @param allowTasksToBePosted The allowTasksToBePosted to set.
1837:             */
1838:            public void setAllowTasksToBePosted(boolean allowTasksToBePosted) {
1839:                synchronized (ALLOW_TASKS_SYNC) {
1840:                    this .allowTasksToBePosted = allowTasksToBePosted;
1841:                }
1842:            }
1843:
1844:            /**
1845:             * Start a new Deadlock Detection Thread (throws a RuntimeException if called
1846:             * twice without stopping the thread before the second call).
1847:             *
1848:             * @param vdb the virtual database the backend is attached to
1849:             */
1850:            public void startDeadlockDetectionThread(VirtualDatabase vdb) {
1851:                if (deadlockDetectionThread != null)
1852:                    throw new RuntimeException(
1853:                            "Trying to start multiple times a deadlock detection thread on the same backend "
1854:                                    + backend.getName());
1855:
1856:                deadlockDetectionThread = new DeadlockDetectionThread(backend,
1857:                        vdb, atomicPostSyncObject, waitForCompletionPolicy
1858:                                .getDeadlockTimeoutInMs());
1859:                deadlockDetectionThread.start();
1860:            }
1861:
1862:            /**
1863:             * Terminate the Deadlock Detection Thread. Throws a RuntimeException is the
1864:             * thread was already stopped (or not started).
1865:             */
1866:            public void terminateDeadlockDetectionThread() {
1867:                if (deadlockDetectionThread == null)
1868:                    throw new RuntimeException(
1869:                            "No deadlock detection thread to stop on backend "
1870:                                    + backend.getName());
1871:
1872:                deadlockDetectionThread.kill();
1873:                deadlockDetectionThread = null;
1874:            }
1875:
1876:            /**
1877:             * Returns a <code>String</code> corresponding to the dump of the internal
1878:             * state of this BackendTaskQueues.<br />
1879:             * This method is synchronized to provided a consistent snapshots of the
1880:             * queues.
1881:             *
1882:             * @return a <code>String</code> representing the internal state of this
1883:             *         BackendTaskQueues
1884:             */
1885:            protected synchronized String dump() {
1886:                StringBuffer buff = new StringBuffer();
1887:                buff.append("Non Conflicting Requests Queue ("
1888:                        + nonConflictingRequestsQueue.size() + ")\n");
1889:                for (Iterator iter = nonConflictingRequestsQueue.iterator(); iter
1890:                        .hasNext();) {
1891:                    BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
1892:                            .next();
1893:                    buff.append("\t" + entry + "\n");
1894:                }
1895:                buff.append("Conflicting Requests Queue ("
1896:                        + conflictingRequestsQueue.size() + ")\n");
1897:                for (Iterator iter = conflictingRequestsQueue.iterator(); iter
1898:                        .hasNext();) {
1899:                    BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
1900:                            .next();
1901:                    buff.append("\t" + entry + "\n");
1902:                }
1903:                buff.append("Stored Procedures Queue ("
1904:                        + storedProcedureQueue.size() + ")\n");
1905:                for (Iterator iter = storedProcedureQueue.iterator(); iter
1906:                        .hasNext();) {
1907:                    BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter
1908:                            .next();
1909:                    buff.append("\t" + entry + "\n");
1910:                }
1911:                return buff.toString();
1912:            }
1913:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.