Source Code Cross Referenced for RAIDb1.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » loadbalancer » raidb1 » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.loadbalancer.raidb1 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2005-2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         *
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         *
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         *
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License.
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet
0022:         * Contributor(s): ______________________
0023:         */package org.continuent.sequoia.controller.loadbalancer.raidb1;
0024:
0025:        import java.sql.Connection;
0026:        import java.sql.SQLException;
0027:        import java.util.ArrayList;
0028:        import java.util.List;
0029:
0030:        import javax.management.ObjectName;
0031:
0032:        import org.continuent.sequoia.common.exceptions.BadConnectionException;
0033:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0034:        import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0035:        import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0036:        import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0037:        import org.continuent.sequoia.common.i18n.Translate;
0038:        import org.continuent.sequoia.common.jmx.JmxConstants;
0039:        import org.continuent.sequoia.common.log.Trace;
0040:        import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0041:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0042:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0043:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0044:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0045:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0046:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0047:        import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0048:        import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0049:        import org.continuent.sequoia.controller.connection.PooledConnection;
0050:        import org.continuent.sequoia.controller.jmx.MBeanServerManager;
0051:        import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0052:        import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0053:        import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0054:        import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueuesControl;
0055:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0056:        import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0057:        import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask;
0058:        import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
0059:        import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask;
0060:        import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0061:        import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0062:        import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0063:        import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0064:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0065:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0066:        import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0067:        import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteQueryTask;
0068:        import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteTask;
0069:        import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
0070:        import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask;
0071:        import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0072:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0073:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0074:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0075:        import org.continuent.sequoia.controller.requests.ParsingGranularities;
0076:        import org.continuent.sequoia.controller.requests.SelectRequest;
0077:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0078:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0079:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
0080:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0081:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0082:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0083:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0084:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0085:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0086:
0087:        /**
0088:         * RAIDb-1 load balancer.
0089:         * <p>
0090:         * This class is an abstract call because the read requests coming from the
0091:         * request controller are NOT treated here but in the subclasses. Transaction
0092:         * management and write requests are broadcasted to all backends.
0093:         *
0094:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0095:         * @version 1.0
0096:         */
0097:        public abstract class RAIDb1 extends AbstractLoadBalancer {
0098:            //
0099:            // How the code is organized ?
0100:            //
0101:            // 1. Member variables
0102:            // 2. Constructor(s)
0103:            // 3. Request handling
0104:            // 4. Transaction handling
0105:            // 5. Backend management
0106:            //
0107:
0108:            protected static Trace logger = Trace
0109:                    .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb1");
0110:
0111:            protected static Trace endUserLogger = Trace
0112:                    .getLogger("org.continuent.sequoia.enduser");
0113:
0114:            /*
0115:             * Constructors
0116:             */
0117:
0118:            /**
0119:             * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
0120:             * worker thread is created for each backend.
0121:             *
0122:             * @param vdb the virtual database this load balancer belongs to.
0123:             * @param waitForCompletionPolicy How many backends must complete before
0124:             *          returning the result?
0125:             * @exception Exception if an error occurs
0126:             */
0127:            public RAIDb1(VirtualDatabase vdb,
0128:                    WaitForCompletionPolicy waitForCompletionPolicy)
0129:                    throws Exception {
0130:                super (vdb, RAIDbLevels.RAIDb1, ParsingGranularities.TABLE);
0131:                this .waitForCompletionPolicy = waitForCompletionPolicy;
0132:            }
0133:
0134:            /*
0135:             * Request Handling
0136:             */
0137:
0138:            /**
0139:             * Perform a read request. If request.isMustBroadcast() is set, then the query
0140:             * is broadcasted to all nodes else a single node is chosen according to the
0141:             * load balancing policy.
0142:             *
0143:             * @param request the <code>SelectRequest</code> to execute
0144:             * @param metadataCache MetadataCache (null if none)
0145:             * @return the corresponding <code>ControllerResultSet</code>
0146:             * @exception SQLException if an error occurs
0147:             * @throws AllBackendsFailedException if all backends failed to execute the
0148:             *           request
0149:             */
0150:            public ControllerResultSet statementExecuteQuery(
0151:                    SelectRequest request, MetadataCache metadataCache)
0152:                    throws SQLException, AllBackendsFailedException {
0153:                if (request.isMustBroadcast())
0154:                    return execBroadcastReadRequest(request, metadataCache);
0155:                else
0156:                    return execSingleBackendReadRequest(request, metadataCache);
0157:            }
0158:
0159:            /**
0160:             * Implementation specific execution of a request on a single backend chosen
0161:             * according to the load balancing strategy.
0162:             *
0163:             * @param request the request to execute
0164:             * @param metadataCache the metadata cache if any or null
0165:             * @return the ResultSet
0166:             * @throws SQLException if an error occurs
0167:             */
0168:            public abstract ControllerResultSet execSingleBackendReadRequest(
0169:                    SelectRequest request, MetadataCache metadataCache)
0170:                    throws SQLException;
0171:
0172:            /**
0173:             * Broadcast a read request execution on all backends. This is similar to a
0174:             * write execution and is useful for queries such as SELECT ... FOR UPDATE.
0175:             *
0176:             * @param request the <code>SelectRequest</code> to execute
0177:             * @param metadataCache MetadataCache (null if none)
0178:             * @return the corresponding <code>ControllerResultSet</code>
0179:             * @exception SQLException if an error occurs
0180:             * @throws AllBackendsFailedException if all backends failed to execute the
0181:             *           request
0182:             * @throws NoMoreBackendException if no backend was available to execute the
0183:             *           stored procedure
0184:             */
0185:            private ControllerResultSet execBroadcastReadRequest(
0186:                    SelectRequest request, MetadataCache metadataCache)
0187:                    throws SQLException, AllBackendsFailedException,
0188:                    NoMoreBackendException {
0189:                // Handle macros
0190:                handleMacros(request);
0191:
0192:                // Total ordering for distributed virtual databases.
0193:                boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0194:                        true);
0195:
0196:                // Log lazy begin if needed
0197:                if (request.isLazyTransactionStart())
0198:                    this .vdb.getRequestManager().logLazyTransactionBegin(
0199:                            request.getTransactionId());
0200:
0201:                // Log request
0202:                if (recoveryLog != null)
0203:                    request.setLogId(recoveryLog.logRequestExecuting(request));
0204:
0205:                int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0206:                        String.valueOf(request.getId()));
0207:
0208:                // Create the task and just wait for the first node to return
0209:                StatementExecuteQueryTask task = new StatementExecuteQueryTask(
0210:                        1, nbOfThreads, request, metadataCache);
0211:
0212:                // Selects are always posted in the non conflicting queue
0213:                atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
0214:                        removeFromTotalOrderQueue);
0215:
0216:                synchronized (task) {
0217:                    if (!task.hasCompleted())
0218:                        waitForTaskCompletion(request.getTimeout() * 1000L,
0219:                                String.valueOf(request.getId()), task);
0220:
0221:                    checkTaskCompletion(task);
0222:                }
0223:
0224:                // Update log with success
0225:                if (recoveryLog != null)
0226:                    recoveryLog.logRequestCompletion(request.getLogId(), true,
0227:                            request.getExecTimeInMs());
0228:
0229:                return task.getResult();
0230:            }
0231:
0232:            /**
0233:             * Performs a write request. This request is broadcasted to all nodes.
0234:             *
0235:             * @param request an <code>AbstractWriteRequest</code>
0236:             * @return number of rows affected by the request
0237:             * @throws AllBackendsFailedException if all backends failed to execute the
0238:             *           request
0239:             * @exception NoMoreBackendException if no backends are left to execute the
0240:             *              request
0241:             * @exception SQLException if an error occurs
0242:             */
0243:            public ExecuteUpdateResult statementExecuteUpdate(
0244:                    AbstractWriteRequest request)
0245:                    throws AllBackendsFailedException, NoMoreBackendException,
0246:                    SQLException {
0247:                return ((StatementExecuteUpdateTask) execWriteRequest(request,
0248:                        false, null)).getResult();
0249:            }
0250:
0251:            /**
0252:             * Perform a write request and return the auto generated keys.
0253:             *
0254:             * @param request the request to execute
0255:             * @param metadataCache the metadataCache if any or null
0256:             * @return update count and auto generated keys.
0257:             * @throws AllBackendsFailedException if all backends failed to execute the
0258:             *           request
0259:             * @exception NoMoreBackendException if no backends are left to execute the
0260:             *              request
0261:             * @exception SQLException if an error occurs
0262:             */
0263:            public GeneratedKeysResult statementExecuteUpdateWithKeys(
0264:                    AbstractWriteRequest request, MetadataCache metadataCache)
0265:                    throws AllBackendsFailedException, NoMoreBackendException,
0266:                    SQLException {
0267:                return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(
0268:                        request, true, metadataCache)).getResult();
0269:            }
0270:
0271:            /**
0272:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0273:             *      MetadataCache)
0274:             */
0275:            public ExecuteResult statementExecute(AbstractRequest request,
0276:                    MetadataCache metadataCache) throws SQLException,
0277:                    AllBackendsFailedException {
0278:                StatementExecuteTask task = (StatementExecuteTask) callStoredProcedure(
0279:                        request, STATEMENT_EXECUTE_TASK, metadataCache);
0280:                return task.getResult();
0281:            }
0282:
0283:            /**
0284:             * Common code for execWriteRequest(AbstractWriteRequest) and
0285:             * execWriteRequestWithKeys(AbstractWriteRequest).
0286:             * <p>
0287:             * Note that macros are processed here.
0288:             * <p>
0289:             * The result is given back in AbstractTask.getResult().
0290:             *
0291:             * @param request the request to execute
0292:             * @param useKeys true if this must give an auto generated keys ResultSet
0293:             * @param metadataCache the metadataCache if any or null
0294:             * @throws AllBackendsFailedException if all backends failed to execute the
0295:             *           request
0296:             * @throws SQLException if an error occurs
0297:             */
0298:            private AbstractTask execWriteRequest(AbstractWriteRequest request,
0299:                    boolean useKeys, MetadataCache metadataCache)
0300:                    throws AllBackendsFailedException, NoMoreBackendException,
0301:                    SQLException {
0302:                // Handle macros
0303:                handleMacros(request);
0304:
0305:                // Total ordering mainly for distributed virtual databases.
0306:                boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0307:                        true);
0308:
0309:                // Log lazy begin if needed
0310:                if (request.isLazyTransactionStart())
0311:                    this .vdb.getRequestManager().logLazyTransactionBegin(
0312:                            request.getTransactionId());
0313:
0314:                // Log request
0315:                if (recoveryLog != null)
0316:                    recoveryLog.logRequestExecuting(request);
0317:
0318:                int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0319:                        String.valueOf(request.getId()));
0320:
0321:                // Create the task
0322:                AbstractTask task;
0323:                if (useKeys)
0324:                    task = new StatementExecuteUpdateWithKeysTask(
0325:                            getNbToWait(nbOfThreads), nbOfThreads, request,
0326:                            metadataCache);
0327:                else
0328:                    task = new StatementExecuteUpdateTask(
0329:                            getNbToWait(nbOfThreads), nbOfThreads, request);
0330:
0331:                atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
0332:                        removeFromTotalOrderQueue);
0333:
0334:                try {
0335:                    synchronized (task) {
0336:                        if (!task.hasCompleted())
0337:                            waitForTaskCompletion(request.getTimeout() * 1000L,
0338:                                    String.valueOf(request.getId()), task);
0339:
0340:                        checkTaskCompletion(task);
0341:                        return task;
0342:                    }
0343:                } finally {
0344:                    if (!request.isAutoCommit()) { // Check that transaction was not aborted in parallel
0345:                        try {
0346:                            this .vdb
0347:                                    .getRequestManager()
0348:                                    .getTransactionMetaData(
0349:                                            new Long(request.getTransactionId()));
0350:                        } catch (SQLException e) { // Transaction was aborted it cannot be found anymore in the active
0351:                            // transaction list. Force an abort
0352:                            logger
0353:                                    .info("Concurrent abort detected, re-enforcing abort of transaction "
0354:                                            + request.getTransactionId());
0355:                            abort(new TransactionMetaData(request
0356:                                    .getTransactionId(), 0, request.getLogin(),
0357:                                    request.isPersistentConnection(), request
0358:                                            .getPersistentConnectionId()));
0359:                        }
0360:                    }
0361:                }
0362:            }
0363:
0364:            protected static final int STATEMENT_EXECUTE_QUERY = 0;
0365:            protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1;
0366:            protected static final int CALLABLE_STATEMENT_EXECUTE = 2;
0367:
0368:            /**
0369:             * Execute a read request on the selected backend.
0370:             *
0371:             * @param request the request to execute
0372:             * @param backend the backend that will execute the request
0373:             * @param metadataCache the metadataCache if any or null
0374:             * @return the ResultSet
0375:             * @throws SQLException if an error occurs
0376:             */
0377:            protected ControllerResultSet executeRequestOnBackend(
0378:                    SelectRequest request, DatabaseBackend backend,
0379:                    MetadataCache metadataCache) throws SQLException,
0380:                    UnreachableBackendException {
0381:                // Handle macros
0382:                handleMacros(request);
0383:
0384:                // Ok, we have a backend, let's execute the request
0385:                AbstractConnectionManager cm = backend
0386:                        .getConnectionManager(request.getLogin());
0387:
0388:                // Sanity check
0389:                if (cm == null) {
0390:                    String msg = Translate.get(
0391:                            "loadbalancer.connectionmanager.not.found",
0392:                            new String[] { request.getLogin(),
0393:                                    backend.getName() });
0394:                    logger.error(msg);
0395:                    throw new SQLException(msg);
0396:                }
0397:
0398:                // Execute the query
0399:                if (request.isAutoCommit()) {
0400:                    ControllerResultSet rs = null;
0401:                    boolean badConnection;
0402:                    do {
0403:                        badConnection = false;
0404:                        // Use a connection just for this request
0405:                        PooledConnection c = null;
0406:                        try {
0407:                            c = cm.retrieveConnectionInAutoCommit(request);
0408:                        } catch (UnreachableBackendException e1) {
0409:                            String msg = Translate
0410:                                    .get(
0411:                                            "loadbalancer.backend.disabling.unreachable",
0412:                                            backend.getName());
0413:                            logger.error(msg);
0414:                            endUserLogger.error(msg);
0415:                            disableBackend(backend, true);
0416:                            throw new UnreachableBackendException(Translate
0417:                                    .get("loadbalancer.backend.unreacheable",
0418:                                            backend.getName()));
0419:                        }
0420:
0421:                        // Sanity check
0422:                        if (c == null)
0423:                            throw new UnreachableBackendException(Translate
0424:                                    .get("loadbalancer.backend.no.connection",
0425:                                            backend.getName()));
0426:
0427:                        // Execute Query
0428:                        try {
0429:                            rs = executeStatementExecuteQueryOnBackend(request,
0430:                                    backend, null, c.getConnection(),
0431:                                    metadataCache);
0432:                            cm.releaseConnectionInAutoCommit(request, c);
0433:                        } catch (SQLException e) {
0434:                            cm.releaseConnectionInAutoCommit(request, c);
0435:                            throw SQLExceptionFactory
0436:                                    .getSQLException(
0437:                                            e,
0438:                                            Translate
0439:                                                    .get(
0440:                                                            "loadbalancer.request.failed.on.backend",
0441:                                                            new String[] {
0442:                                                                    request
0443:                                                                            .getSqlShortForm(vdb
0444:                                                                                    .getSqlShortFormLength()),
0445:                                                                    backend
0446:                                                                            .getName(),
0447:                                                                    e
0448:                                                                            .getMessage() }));
0449:                        } catch (BadConnectionException e) { // Get rid of the bad connection
0450:                            cm.deleteConnection(c);
0451:                            if (request.isPersistentConnection()) {
0452:                                cm.deletePersistentConnection(request
0453:                                        .getPersistentConnectionId());
0454:                            }
0455:                            badConnection = true;
0456:                        } catch (UnreachableBackendException e) {
0457:                            String msg = Translate
0458:                                    .get(
0459:                                            "loadbalancer.backend.disabling.unreachable",
0460:                                            backend.getName());
0461:                            logger.error(msg);
0462:                            endUserLogger.error(msg);
0463:                            disableBackend(backend, true);
0464:                            throw new UnreachableBackendException(Translate
0465:                                    .get("loadbalancer.backend.unreacheable",
0466:                                            backend.getName()));
0467:                        } catch (Throwable e) {
0468:
0469:                            logger.error("Unexpected exception:", e);
0470:                            cm.releaseConnectionInAutoCommit(request, c);
0471:                            throw new SQLException(
0472:                                    Translate
0473:                                            .get(
0474:                                                    "loadbalancer.request.failed.on.backend",
0475:                                                    new String[] {
0476:                                                            request
0477:                                                                    .getSqlShortForm(vdb
0478:                                                                            .getSqlShortFormLength()),
0479:                                                            backend.getName(),
0480:                                                            e.getMessage() }));
0481:                        }
0482:                    } while (badConnection);
0483:                    if (logger.isDebugEnabled())
0484:                        logger.debug(Translate.get("loadbalancer.execute.on",
0485:                                new String[] { String.valueOf(request.getId()),
0486:                                        backend.getName() }));
0487:                    return rs;
0488:                } else { // Inside a transaction
0489:                    Connection c;
0490:                    long tid = request.getTransactionId();
0491:
0492:                    try {
0493:                        c = backend
0494:                                .getConnectionForTransactionAndLazyBeginIfNeeded(
0495:                                        request, cm);
0496:                    } catch (UnreachableBackendException e1) {
0497:                        String msg = Translate.get(
0498:                                "loadbalancer.backend.disabling.unreachable",
0499:                                backend.getName());
0500:                        logger.error(msg);
0501:                        endUserLogger.error(msg);
0502:                        disableBackend(backend, true);
0503:                        throw new UnreachableBackendException(Translate.get(
0504:                                "loadbalancer.backend.unreacheable", backend
0505:                                        .getName()));
0506:                    } catch (NoTransactionStartWhenDisablingException e) {
0507:                        String msg = Translate.get(
0508:                                "loadbalancer.backend.is.disabling",
0509:                                new String[] {
0510:                                        request.getSqlShortForm(vdb
0511:                                                .getSqlShortFormLength()),
0512:                                        backend.getName() });
0513:                        logger.error(msg);
0514:                        throw new UnreachableBackendException(msg);
0515:                    }
0516:
0517:                    // Sanity check
0518:                    if (c == null)
0519:                        throw new SQLException(Translate.get(
0520:                                "loadbalancer.unable.retrieve.connection",
0521:                                new String[] { String.valueOf(tid),
0522:                                        backend.getName() }));
0523:
0524:                    // Execute Query
0525:                    ControllerResultSet rs = null;
0526:                    try {
0527:                        rs = executeStatementExecuteQueryOnBackend(request,
0528:                                backend, null, c, metadataCache);
0529:                    } catch (SQLException e) {
0530:                        throw SQLExceptionFactory
0531:                                .getSQLException(
0532:                                        e,
0533:                                        Translate
0534:                                                .get(
0535:                                                        "loadbalancer.request.failed.on.backend",
0536:                                                        new String[] {
0537:                                                                request
0538:                                                                        .getSqlShortForm(vdb
0539:                                                                                .getSqlShortFormLength()),
0540:                                                                backend
0541:                                                                        .getName(),
0542:                                                                e.getMessage() }));
0543:                    } catch (BadConnectionException e) { // Connection failed, so did the transaction
0544:                        // Disable the backend.
0545:                        cm.deleteConnection(tid);
0546:                        String msg = Translate
0547:                                .get(
0548:                                        "loadbalancer.backend.disabling.connection.failure",
0549:                                        backend.getName());
0550:                        logger.error(msg);
0551:                        endUserLogger.error(msg);
0552:                        disableBackend(backend, true);
0553:                        throw new UnreachableBackendException(msg);
0554:                    } catch (UnreachableBackendException e) {
0555:                        String msg = Translate.get(
0556:                                "loadbalancer.backend.disabling.unreachable",
0557:                                backend.getName());
0558:                        logger.error(msg);
0559:                        endUserLogger.error(msg);
0560:                        disableBackend(backend, true);
0561:                        throw e;
0562:                    } catch (Throwable e) {
0563:                        logger.error("Unexpected exception:", e);
0564:                        throw new SQLException(Translate.get(
0565:                                "loadbalancer.request.failed.on.backend",
0566:                                new String[] {
0567:                                        request.getSqlShortForm(vdb
0568:                                                .getSqlShortFormLength()),
0569:                                        backend.getName(), e.getMessage() }));
0570:                    }
0571:                    if (logger.isDebugEnabled())
0572:                        logger.debug(Translate.get(
0573:                                "loadbalancer.execute.transaction.on",
0574:                                new String[] { String.valueOf(tid),
0575:                                        String.valueOf(request.getId()),
0576:                                        backend.getName() }));
0577:                    return rs;
0578:                }
0579:            }
0580:
0581:            /**
0582:             * Execute a stored procedure on the selected backend.
0583:             *
0584:             * @param proc the stored procedure to execute
0585:             * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0586:             *          false if we must call CallableStatement.execute()
0587:             * @param backend the backend that will execute the request
0588:             * @param metadataCache the metadataCache if any or null
0589:             * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0590:             *         <code>ExecuteResult</code> object otherwise
0591:             * @throws SQLException if an error occurs
0592:             */
0593:            protected Object executeStoredProcedureOnBackend(
0594:                    StoredProcedure proc, boolean isExecuteQuery,
0595:                    DatabaseBackend backend, MetadataCache metadataCache)
0596:                    throws SQLException, UnreachableBackendException {
0597:                // Ok, we have a backend, let's execute the request
0598:                AbstractConnectionManager cm = backend
0599:                        .getConnectionManager(proc.getLogin());
0600:
0601:                // Sanity check
0602:                if (cm == null) {
0603:                    String msg = Translate
0604:                            .get("loadbalancer.connectionmanager.not.found",
0605:                                    new String[] { proc.getLogin(),
0606:                                            backend.getName() });
0607:                    logger.error(msg);
0608:                    throw new SQLException(msg);
0609:                }
0610:
0611:                // Execute the query
0612:                if (proc.isAutoCommit()) {
0613:                    Object result = null;
0614:                    boolean badConnection;
0615:                    PooledConnection c = null;
0616:                    do {
0617:                        badConnection = false;
0618:                        PooledConnection previousConnection = c;
0619:                        // Use a connection just for this request
0620:                        try {
0621:                            c = cm.retrieveConnectionInAutoCommit(proc);
0622:                        } catch (UnreachableBackendException e1) {
0623:                            String msg = Translate
0624:                                    .get(
0625:                                            "loadbalancer.backend.disabling.unreachable",
0626:                                            backend.getName());
0627:                            logger.error(msg);
0628:                            endUserLogger.error(msg);
0629:                            disableBackend(backend, true);
0630:                            throw new UnreachableBackendException(Translate
0631:                                    .get("loadbalancer.backend.unreacheable",
0632:                                            backend.getName()));
0633:                        }
0634:
0635:                        // Sanity check
0636:                        if (c == null || c == previousConnection)
0637:                            throw new UnreachableBackendException(Translate
0638:                                    .get("loadbalancer.backend.no.connection",
0639:                                            backend.getName()));
0640:
0641:                        // Execute Query
0642:                        try {
0643:                            if (isExecuteQuery)
0644:                                result = AbstractLoadBalancer
0645:                                        .executeCallableStatementExecuteQueryOnBackend(
0646:                                                proc, backend, null, c
0647:                                                        .getConnection(),
0648:                                                metadataCache);
0649:                            else
0650:                                result = AbstractLoadBalancer
0651:                                        .executeCallableStatementExecuteOnBackend(
0652:                                                proc, backend, null, c,
0653:                                                metadataCache);
0654:                        } catch (BadConnectionException e) { // Get rid of the bad connection
0655:                            cm.deleteConnection(c);
0656:                            if (proc.isPersistentConnection())
0657:                                cm.deletePersistentConnection(proc
0658:                                        .getPersistentConnectionId());
0659:                            badConnection = true;
0660:                        } catch (Throwable e) {
0661:                            logger.error("Unexpected exception:", e);
0662:                            throw new SQLException(
0663:                                    Translate
0664:                                            .get(
0665:                                                    "loadbalancer.storedprocedure.failed.on.backend",
0666:                                                    new String[] {
0667:                                                            proc
0668:                                                                    .getSqlShortForm(vdb
0669:                                                                            .getSqlShortFormLength()),
0670:                                                            backend.getName(),
0671:                                                            e.getMessage() }));
0672:                        } finally {
0673:                            cm.releaseConnectionInAutoCommit(proc, c);
0674:                        }
0675:                    } while (badConnection);
0676:
0677:                    if (logger.isDebugEnabled())
0678:                        logger.debug(Translate.get(
0679:                                "loadbalancer.storedprocedure.on",
0680:                                new String[] { String.valueOf(proc.getId()),
0681:                                        backend.getName() }));
0682:
0683:                    return result;
0684:                } else { // Inside a transaction
0685:                    Connection c;
0686:                    long tid = proc.getTransactionId();
0687:
0688:                    try {
0689:                        c = backend
0690:                                .getConnectionForTransactionAndLazyBeginIfNeeded(
0691:                                        proc, cm);
0692:                    } catch (UnreachableBackendException e) {
0693:                        // intercept the UBE to disable the unreachable backend
0694:                        // and propagate the exception
0695:                        endUserLogger.error(Translate.get(
0696:                                "loadbalancer.backend.disabling.unreachable",
0697:                                backend.getName()));
0698:                        disableBackend(backend, true);
0699:                        throw e;
0700:                    } catch (NoTransactionStartWhenDisablingException e) {
0701:                        String msg = Translate.get(
0702:                                "loadbalancer.backend.is.disabling",
0703:                                new String[] {
0704:                                        proc.getSqlShortForm(vdb
0705:                                                .getSqlShortFormLength()),
0706:                                        backend.getName() });
0707:                        logger.error(msg);
0708:                        throw new UnreachableBackendException(msg);
0709:                    }
0710:
0711:                    // Sanity check
0712:                    if (c == null)
0713:                        throw new SQLException(Translate.get(
0714:                                "loadbalancer.unable.retrieve.connection",
0715:                                new String[] { String.valueOf(tid),
0716:                                        backend.getName() }));
0717:
0718:                    // Execute Query
0719:                    try {
0720:                        if (logger.isDebugEnabled())
0721:                            logger.debug(Translate.get(
0722:                                    "loadbalancer.execute.transaction.on",
0723:                                    new String[] { String.valueOf(tid),
0724:                                            String.valueOf(proc.getId()),
0725:                                            backend.getName() }));
0726:                        if (isExecuteQuery)
0727:                            return AbstractLoadBalancer
0728:                                    .executeCallableStatementExecuteQueryOnBackend(
0729:                                            proc, backend, null, c,
0730:                                            metadataCache);
0731:                        else
0732:                            return AbstractLoadBalancer
0733:                                    .executeCallableStatementExecuteOnBackend(
0734:                                            proc,
0735:                                            backend,
0736:                                            null,
0737:                                            cm
0738:                                                    .retrieveConnectionForTransaction(tid),
0739:                                            metadataCache);
0740:                    } catch (BadConnectionException e) { // Connection failed, so did the transaction
0741:                        // Disable the backend.
0742:                        cm.deleteConnection(tid);
0743:                        String msg = Translate
0744:                                .get(
0745:                                        "loadbalancer.backend.disabling.connection.failure",
0746:                                        backend.getName());
0747:                        logger.error(msg);
0748:                        endUserLogger.error(msg);
0749:                        disableBackend(backend, true);
0750:                        throw new UnreachableBackendException(msg);
0751:                    } catch (Throwable e) {
0752:                        logger.error("Unexpected exception:", e);
0753:                        throw new SQLException(
0754:                                Translate
0755:                                        .get(
0756:                                                "loadbalancer.storedprocedure.failed.on.backend",
0757:                                                new String[] {
0758:                                                        proc
0759:                                                                .getSqlShortForm(vdb
0760:                                                                        .getSqlShortFormLength()),
0761:                                                        backend.getName(),
0762:                                                        e.getMessage() }));
0763:                    }
0764:                }
0765:            }
0766:
0767:            /**
0768:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0769:             *      MetadataCache)
0770:             */
0771:            public ControllerResultSet callableStatementExecuteQuery(
0772:                    StoredProcedure proc, MetadataCache metadataCache)
0773:                    throws SQLException, AllBackendsFailedException {
0774:                CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure(
0775:                        proc, EXECUTE_QUERY_TASK, metadataCache);
0776:                return task.getResult();
0777:            }
0778:
0779:            /**
0780:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0781:             */
0782:            public ExecuteUpdateResult callableStatementExecuteUpdate(
0783:                    StoredProcedure proc) throws SQLException,
0784:                    AllBackendsFailedException {
0785:                CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure(
0786:                        proc, EXECUTE_UPDATE_TASK, null);
0787:                return task.getResult();
0788:            }
0789:
0790:            /**
0791:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
0792:             *      MetadataCache)
0793:             */
0794:            public ExecuteResult callableStatementExecute(StoredProcedure proc,
0795:                    MetadataCache metadataCache) throws SQLException,
0796:                    AllBackendsFailedException {
0797:                CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure(
0798:                        proc, CALLABLE_EXECUTE_TASK, metadataCache);
0799:                return task.getResult();
0800:            }
0801:
0802:            private static final int EXECUTE_QUERY_TASK = 0;
0803:            private static final int EXECUTE_UPDATE_TASK = 1;
0804:            private static final int CALLABLE_EXECUTE_TASK = 2;
0805:            private static final int STATEMENT_EXECUTE_TASK = 3;
0806:
0807:            /**
0808:             * Post the stored procedure call in the threads task list.
0809:             * <p>
0810:             * Note that macros are also processed here.
0811:             *
0812:             * @param request the stored procedure to call or the request to execute if
0813:             *          taskType is STATEMENT_EXECUTE_TASK
0814:             * @param taskType one of EXECUTE_QUERY_TASK, EXECUTE_UPDATE_TASK,
0815:             *          CALLABLE_EXECUTE_TASK or STATEMENT_EXECUTE_TASK
0816:             * @param metadataCache the metadataCache if any or null
0817:             * @return the task that has been executed (caller can get the result by
0818:             *         calling getResult())
0819:             * @throws SQLException if an error occurs
0820:             * @throws AllBackendsFailedException if all backends failed to execute the
0821:             *           stored procedure
0822:             * @throws NoMoreBackendException if no backend was available to execute the
0823:             *           stored procedure
0824:             */
0825:            private AbstractTask callStoredProcedure(AbstractRequest request,
0826:                    int taskType, MetadataCache metadataCache)
0827:                    throws SQLException, AllBackendsFailedException,
0828:                    NoMoreBackendException {
0829:                // Handle macros
0830:                handleMacros(request);
0831:
0832:                // Total ordering mainly for distributed virtual databases.
0833:                boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0834:                        true);
0835:
0836:                // Log lazy begin if needed
0837:                if (request.isLazyTransactionStart())
0838:                    this .vdb.getRequestManager().logLazyTransactionBegin(
0839:                            request.getTransactionId());
0840:
0841:                // Log request
0842:                if (recoveryLog != null) {
0843:                    boolean mustLog = !request.isReadOnly();
0844:                    if (taskType != STATEMENT_EXECUTE_TASK) { // faster than (request instanceof StoredProcedure)
0845:                        DatabaseProcedureSemantic semantic = ((StoredProcedure) request)
0846:                                .getSemantic();
0847:                        mustLog = (semantic == null) || semantic.isWrite();
0848:                    }
0849:                    if (mustLog)
0850:                        recoveryLog.logRequestExecuting(request);
0851:                }
0852:
0853:                int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0854:                        String.valueOf(request.getId()));
0855:
0856:                // Create the task
0857:                AbstractTask task;
0858:                switch (taskType) {
0859:                case EXECUTE_QUERY_TASK:
0860:                    task = new CallableStatementExecuteQueryTask(
0861:                            getNbToWait(nbOfThreads), nbOfThreads,
0862:                            (StoredProcedure) request, metadataCache);
0863:                    break;
0864:                case EXECUTE_UPDATE_TASK:
0865:                    task = new CallableStatementExecuteUpdateTask(
0866:                            getNbToWait(nbOfThreads), nbOfThreads,
0867:                            (StoredProcedure) request);
0868:                    break;
0869:                case CALLABLE_EXECUTE_TASK:
0870:                    task = new CallableStatementExecuteTask(
0871:                            getNbToWait(nbOfThreads), nbOfThreads,
0872:                            (StoredProcedure) request, metadataCache);
0873:                    break;
0874:                case STATEMENT_EXECUTE_TASK:
0875:                    task = new StatementExecuteTask(getNbToWait(nbOfThreads),
0876:                            nbOfThreads, (AbstractWriteRequest) request,
0877:                            metadataCache);
0878:                    break;
0879:                default:
0880:                    throw new RuntimeException("Unhandled task type "
0881:                            + taskType + " in callStoredProcedure");
0882:                }
0883:
0884:                atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
0885:                        removeFromTotalOrderQueue);
0886:
0887:                try {
0888:                    synchronized (task) {
0889:                        if (!task.hasCompleted())
0890:                            waitForTaskCompletion(request.getTimeout() * 1000L,
0891:                                    String.valueOf(request.getId()), task);
0892:
0893:                        checkTaskCompletion(task);
0894:                        return task;
0895:                    }
0896:                } finally {
0897:                    if (!request.isAutoCommit()) { // Check that transaction was not aborted in parallel
0898:                        try {
0899:                            this .vdb
0900:                                    .getRequestManager()
0901:                                    .getTransactionMetaData(
0902:                                            new Long(request.getTransactionId()));
0903:                        } catch (SQLException e) { // Transaction was aborted it cannot be found anymore in the active
0904:                            // transaction list. Force an abort
0905:                            logger
0906:                                    .info("Concurrent abort detected, re-inforcing abort of transaction "
0907:                                            + request.getTransactionId());
0908:                            abort(new TransactionMetaData(request
0909:                                    .getTransactionId(), 0, request.getLogin(),
0910:                                    request.isPersistentConnection(), request
0911:                                            .getPersistentConnectionId()));
0912:                        }
0913:                    }
0914:                }
0915:            }
0916:
0917:            /**
0918:             * Check the completion status of the task and throws appropriate Exceptions
0919:             * if the status of the task was not successful, otherwise do nothing.
0920:             *
0921:             * @param task the completed AbstractTask
0922:             * @throws AllBackendsFailedException if all backends failed to execute the
0923:             *           request
0924:             * @exception NoMoreBackendException if no backends are left to execute the
0925:             *              request
0926:             * @exception SQLException if an error occurs
0927:             */
0928:            private void checkTaskCompletion(AbstractTask task)
0929:                    throws NoMoreBackendException, AllBackendsFailedException,
0930:                    SQLException {
0931:                AbstractRequest request = task.getRequest();
0932:
0933:                if (task.getSuccess() > 0)
0934:                    return;
0935:
0936:                // Check that someone failed, it might be the case that we only have
0937:                // disabling backends left and they have not played this query (thus
0938:                // none of them have succeeded or failed).
0939:                if (task.getFailed() == 0) {
0940:                    throw new NoMoreBackendException(Translate
0941:                            .get("loadbalancer.backendlist.empty"));
0942:                }
0943:
0944:                if (task.getSuccess() == 0) {
0945:                    // All backends that executed the query failed
0946:                    List exceptions = task.getExceptions();
0947:                    if (exceptions == null)
0948:                        throw new AllBackendsFailedException(Translate.get(
0949:                                "loadbalancer.request.failed.all",
0950:                                new Object[] { request.getType(),
0951:                                        String.valueOf(request.getId()) }));
0952:                    else {
0953:                        String errorMsg = Translate.get(
0954:                                "loadbalancer.request.failed.stack",
0955:                                new Object[] { request.getType(),
0956:                                        String.valueOf(request.getId()) })
0957:                                + "\n";
0958:                        SQLException ex = SQLExceptionFactory.getSQLException(
0959:                                exceptions, errorMsg);
0960:                        logger.error(ex.getMessage());
0961:                        throw ex;
0962:                    }
0963:                }
0964:            }
0965:
0966:            /**
0967:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
0968:             */
0969:            public ControllerResultSet getPreparedStatementGetMetaData(
0970:                    AbstractRequest request) throws SQLException {
0971:                // Choose a backend
0972:                try {
0973:                    vdb.acquireReadLockBackendLists();
0974:                } catch (InterruptedException e) {
0975:                    String msg = Translate.get(
0976:                            "loadbalancer.backendlist.acquire.readlock.failed",
0977:                            e);
0978:                    logger.error(msg);
0979:                    throw new SQLException(msg);
0980:                }
0981:
0982:                /*
0983:                 * The backend that will execute the query
0984:                 */
0985:                DatabaseBackend backend = null;
0986:
0987:                // Note that vdb lock is released in the finally clause of this try/catch
0988:                // block
0989:                try {
0990:                    ArrayList backends = vdb.getBackends();
0991:                    int size = backends.size();
0992:
0993:                    if (size == 0)
0994:                        throw new SQLException(Translate.get(
0995:                                "loadbalancer.execute.no.backend.available",
0996:                                request.getId()));
0997:
0998:                    // Choose the first available backend
0999:                    for (int i = 0; i < size; i++) {
1000:                        DatabaseBackend b = (DatabaseBackend) backends.get(i);
1001:                        if (b.isReadEnabled()) {
1002:                            backend = b;
1003:                            break;
1004:                        }
1005:                    }
1006:                } catch (Throwable e) {
1007:                    String msg = Translate.get(
1008:                            "loadbalancer.execute.find.backend.failed",
1009:                            new String[] {
1010:                                    request.getSqlShortForm(vdb
1011:                                            .getSqlShortFormLength()),
1012:                                    e.getMessage() });
1013:                    logger.error(msg, e);
1014:                    throw new SQLException(msg);
1015:                } finally {
1016:                    vdb.releaseReadLockBackendLists();
1017:                }
1018:
1019:                if (backend == null)
1020:                    throw new NoMoreBackendException(Translate.get(
1021:                            "loadbalancer.execute.no.backend.enabled", request
1022:                                    .getId()));
1023:
1024:                // Ok, we have a backend, let's execute the request
1025:                AbstractConnectionManager cm = backend
1026:                        .getConnectionManager(request.getLogin());
1027:
1028:                // Sanity check
1029:                if (cm == null) {
1030:                    String msg = Translate.get(
1031:                            "loadbalancer.connectionmanager.not.found",
1032:                            new String[] { request.getLogin(),
1033:                                    backend.getName() });
1034:                    logger.error(msg);
1035:                    throw new SQLException(msg);
1036:                }
1037:
1038:                // Execute the query
1039:                if (request.isAutoCommit()) {
1040:                    ControllerResultSet rs = null;
1041:                    boolean badConnection;
1042:                    do {
1043:                        badConnection = false;
1044:                        // Use a connection just for this request
1045:                        PooledConnection c = null;
1046:                        try {
1047:                            c = cm.retrieveConnectionInAutoCommit(request);
1048:                        } catch (UnreachableBackendException e1) {
1049:                            String msg = Translate
1050:                                    .get(
1051:                                            "loadbalancer.backend.disabling.unreachable",
1052:                                            backend.getName());
1053:                            logger.error(msg);
1054:                            endUserLogger.error(msg);
1055:                            disableBackend(backend, true);
1056:                            // Retry on a different backend
1057:                            return getPreparedStatementGetMetaData(request);
1058:                        }
1059:
1060:                        // Sanity check
1061:                        if (c == null)
1062:                            throw new SQLException(Translate.get(
1063:                                    "loadbalancer.backend.no.connection",
1064:                                    backend.getName()));
1065:
1066:                        // Execute Query
1067:                        try {
1068:                            rs = preparedStatementGetMetaDataOnBackend(request
1069:                                    .getSqlOrTemplate(), backend, c
1070:                                    .getConnection());
1071:                            cm.releaseConnectionInAutoCommit(request, c);
1072:                        } catch (SQLException e) {
1073:                            cm.releaseConnectionInAutoCommit(request, c);
1074:                            throw SQLExceptionFactory
1075:                                    .getSQLException(
1076:                                            e,
1077:                                            Translate
1078:                                                    .get(
1079:                                                            "loadbalancer.request.failed.on.backend",
1080:                                                            new String[] {
1081:                                                                    request
1082:                                                                            .getSqlShortForm(vdb
1083:                                                                                    .getSqlShortFormLength()),
1084:                                                                    backend
1085:                                                                            .getName(),
1086:                                                                    e
1087:                                                                            .getMessage() }));
1088:                        } catch (BadConnectionException e) { // Get rid of the bad connection
1089:                            cm.deleteConnection(c);
1090:                            badConnection = true;
1091:                        } catch (Throwable e) {
1092:                            cm.releaseConnectionInAutoCommit(request, c);
1093:
1094:                            logger.error("Unexpected exception:", e);
1095:                            throw new SQLException(
1096:                                    Translate
1097:                                            .get(
1098:                                                    "loadbalancer.request.failed.on.backend",
1099:                                                    new String[] {
1100:                                                            request
1101:                                                                    .getSqlShortForm(vdb
1102:                                                                            .getSqlShortFormLength()),
1103:                                                            backend.getName(),
1104:                                                            e.getMessage() }));
1105:                        }
1106:                    } while (badConnection);
1107:                    if (logger.isDebugEnabled())
1108:                        logger.debug(Translate.get("loadbalancer.execute.on",
1109:                                new String[] { String.valueOf(request.getId()),
1110:                                        backend.getName() }));
1111:                    return rs;
1112:                } else { // Inside a transaction
1113:                    Connection c;
1114:                    long tid = request.getTransactionId();
1115:
1116:                    try {
1117:                        c = backend
1118:                                .getConnectionForTransactionAndLazyBeginIfNeeded(
1119:                                        request, cm);
1120:                    } catch (UnreachableBackendException e1) {
1121:                        String msg = Translate.get(
1122:                                "loadbalancer.backend.disabling.unreachable",
1123:                                backend.getName());
1124:                        logger.error(msg);
1125:                        endUserLogger.error(msg);
1126:                        disableBackend(backend, true);
1127:                        throw new SQLException(Translate.get(
1128:                                "loadbalancer.backend.unreacheable", backend
1129:                                        .getName()));
1130:                    } catch (NoTransactionStartWhenDisablingException e) {
1131:                        String msg = Translate.get(
1132:                                "loadbalancer.backend.is.disabling",
1133:                                new String[] {
1134:                                        request.getSqlShortForm(vdb
1135:                                                .getSqlShortFormLength()),
1136:                                        backend.getName() });
1137:                        logger.error(msg);
1138:                        throw new SQLException(msg);
1139:                    }
1140:
1141:                    // Sanity check
1142:                    if (c == null)
1143:                        throw new SQLException(Translate.get(
1144:                                "loadbalancer.unable.retrieve.connection",
1145:                                new String[] { String.valueOf(tid),
1146:                                        backend.getName() }));
1147:
1148:                    // Execute Query
1149:                    ControllerResultSet rs = null;
1150:                    try {
1151:                        rs = preparedStatementGetMetaDataOnBackend(request
1152:                                .getSqlOrTemplate(), backend, c);
1153:                    } catch (SQLException e) {
1154:                        throw e;
1155:                    } catch (BadConnectionException e) { // Connection failed, so did the transaction
1156:                        // Disable the backend.
1157:                        cm.deleteConnection(tid);
1158:                        String msg = Translate
1159:                                .get(
1160:                                        "loadbalancer.backend.disabling.connection.failure",
1161:                                        backend.getName());
1162:                        logger.error(msg);
1163:                        endUserLogger.error(msg);
1164:                        disableBackend(backend, true);
1165:                        throw new SQLException(msg);
1166:                    } catch (Throwable e) {
1167:
1168:                        logger.error("Unexpected exception:", e);
1169:                        throw new SQLException(Translate.get(
1170:                                "loadbalancer.request.failed.on.backend",
1171:                                new String[] {
1172:                                        request.getSqlShortForm(vdb
1173:                                                .getSqlShortFormLength()),
1174:                                        backend.getName(), e.getMessage() }));
1175:                    }
1176:                    if (logger.isDebugEnabled())
1177:                        logger.debug(Translate.get(
1178:                                "loadbalancer.execute.transaction.on",
1179:                                new String[] { String.valueOf(tid),
1180:                                        String.valueOf(request.getId()),
1181:                                        backend.getName() }));
1182:                    return rs;
1183:                }
1184:            }
1185:
1186:            /*
1187:             * Transaction management
1188:             */
1189:
1190:            /**
1191:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1192:             */
1193:            public void abort(TransactionMetaData tm) throws SQLException {
1194:                long tid = tm.getTransactionId();
1195:                boolean executeRollback = false;
1196:                DistributedRollback toqObject = null;
1197:                /*
1198:                 * Let previous queries be flushed into the load balancer queues so that we
1199:                 * can abort them and that no queries for that transaction wait in the total
1200:                 * order queue while we are aborting. Note that the wait and remove from the
1201:                 * total order queue will be done in the call to rollback at the end of this
1202:                 * method.
1203:                 */
1204:                if (vdb.getTotalOrderQueue() != null) {
1205:                    toqObject = new DistributedRollback(tm.getLogin(), tid);
1206:                    waitForTotalOrder(toqObject, false);
1207:                }
1208:
1209:                try {
1210:                    // Acquire the lock
1211:                    String requestDescription = "abort " + tid;
1212:                    int nbOfThreads = acquireLockAndCheckNbOfThreads(toqObject,
1213:                            requestDescription);
1214:
1215:                    boolean rollbackInProgress = false;
1216:                    synchronized (enabledBackends) {
1217:                        // Abort all queries on all backends that have started this transaction
1218:                        for (int i = 0; i < nbOfThreads; i++) {
1219:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1220:                                    .get(i);
1221:                            rollbackInProgress = rollbackInProgress
1222:                                    || backend.getTaskQueues()
1223:                                            .abortAllQueriesForTransaction(tid);
1224:                        }
1225:                    }
1226:
1227:                    // Release the lock
1228:                    backendListLock.releaseRead();
1229:
1230:                    if (rollbackInProgress) { // already aborting
1231:                        if (vdb.getTotalOrderQueue() != null)
1232:                            removeObjectFromAndNotifyTotalOrderQueue(toqObject);
1233:                        return;
1234:                    }
1235:
1236:                    executeRollback = true;
1237:                    rollback(tm);
1238:                } catch (NoMoreBackendException ignore) {
1239:                    if (!executeRollback && (recoveryLog != null))
1240:                        recoveryLog.logAbort(tm); // Executing status
1241:                }
1242:            }
1243:
1244:            /**
1245:             * Begins a new transaction.
1246:             *
1247:             * @param tm the transaction marker metadata
1248:             * @exception SQLException if an error occurs
1249:             */
1250:            public final void begin(TransactionMetaData tm) throws SQLException {
1251:            }
1252:
1253:            /**
1254:             * Commits a transaction.
1255:             *
1256:             * @param tm the transaction marker metadata
1257:             * @exception SQLException if an error occurs
1258:             */
1259:            public void commit(TransactionMetaData tm) throws SQLException {
1260:                long tid = tm.getTransactionId();
1261:                Long lTid = new Long(tid);
1262:
1263:                // Ordering for distributed virtual database
1264:                boolean canTakeReadLock = false;
1265:                DistributedCommit totalOrderCommit = null;
1266:                if (vdb.getTotalOrderQueue() != null) {
1267:                    // Total ordering mainly for distributed virtual databases.
1268:                    // If waitForTotalOrder returns true then the query has been scheduled in
1269:                    // total order and there is no need to take a write lock later to resolve
1270:                    // potential conflicts.
1271:                    totalOrderCommit = new DistributedCommit(tm.getLogin(), tid);
1272:                    canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1273:                    if (!canTakeReadLock)
1274:                        // This is a local commit no total order info
1275:                        totalOrderCommit = null;
1276:                }
1277:
1278:                // Update the recovery log
1279:                if (recoveryLog != null)
1280:                    recoveryLog.logCommit(tm);
1281:
1282:                // Acquire the lock
1283:                String requestDescription = "commit " + tid;
1284:                int nbOfThreads = acquireLockAndCheckNbOfThreads(
1285:                        totalOrderCommit, requestDescription);
1286:
1287:                // Build the list of backends that need to commit this transaction
1288:                ArrayList commitList = new ArrayList(nbOfThreads);
1289:                for (int i = 0; i < nbOfThreads; i++) {
1290:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1291:                            .get(i);
1292:                    if (backend.isStartedTransaction(lTid))
1293:                        commitList.add(backend);
1294:                }
1295:
1296:                int nbOfThreadsToCommit = commitList.size();
1297:                CommitTask task = null;
1298:                if (nbOfThreadsToCommit != 0)
1299:                    task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1300:                            nbOfThreadsToCommit, tm);
1301:
1302:                // Post the task in the non-conflicting queues.
1303:                synchronized (enabledBackends) {
1304:                    for (int i = 0; i < nbOfThreadsToCommit; i++) {
1305:                        DatabaseBackend backend = (DatabaseBackend) commitList
1306:                                .get(i);
1307:                        backend.getTaskQueues()
1308:                                .addTaskToBackendTotalOrderQueue(task);
1309:                    }
1310:                }
1311:
1312:                // Release the lock
1313:                backendListLock.releaseRead();
1314:
1315:                // Unblock next query from total order queue
1316:                if (totalOrderCommit != null)
1317:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1318:
1319:                // Check if someone had something to commit
1320:                if (task == null)
1321:                    return;
1322:
1323:                synchronized (task) {
1324:                    if (!task.hasCompleted())
1325:                        waitForTaskCompletion(tm.getTimeout(),
1326:                                requestDescription, task);
1327:
1328:                    if (task.getSuccess() == 0) { // All tasks failed
1329:                        List exceptions = task.getExceptions();
1330:                        if (exceptions == null)
1331:                            throw new SQLException(Translate.get(
1332:                                    "loadbalancer.commit.all.failed", tid));
1333:                        else {
1334:                            String errorMsg = Translate.get(
1335:                                    "loadbalancer.commit.failed.stack", tid)
1336:                                    + "\n";
1337:                            SQLException ex = SQLExceptionFactory
1338:                                    .getSQLException(exceptions, errorMsg);
1339:                            logger.error(ex.getMessage());
1340:                            throw ex;
1341:                        }
1342:                    }
1343:                }
1344:            }
1345:
1346:            /**
1347:             * Rollbacks a transaction.
1348:             *
1349:             * @param tm the transaction marker metadata
1350:             * @exception SQLException if an error occurs
1351:             */
1352:            public void rollback(TransactionMetaData tm) throws SQLException {
1353:                long tid = tm.getTransactionId();
1354:                Long lTid = new Long(tid);
1355:
1356:                // Ordering for distributed virtual database
1357:                DistributedRollback totalOrderRollback = null;
1358:                boolean canTakeReadLock = false;
1359:                if (vdb.getTotalOrderQueue() != null) {
1360:                    totalOrderRollback = new DistributedRollback(tm.getLogin(),
1361:                            tid);
1362:                    // Total ordering mainly for distributed virtual databases.
1363:                    // If waitForTotalOrder returns true then the query has been scheduled in
1364:                    // total order and there is no need to take a write lock later to resolve
1365:                    // potential conflicts.
1366:                    canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1367:                            false);
1368:                    if (!canTakeReadLock)
1369:                        // This is a local rollback no total order info
1370:                        totalOrderRollback = null;
1371:                }
1372:
1373:                // Update the recovery log
1374:                if (recoveryLog != null)
1375:                    recoveryLog.logRollback(tm);
1376:
1377:                // Acquire the lock
1378:                String requestDescription = "rollback " + tid;
1379:                int nbOfThreads = acquireLockAndCheckNbOfThreads(
1380:                        totalOrderRollback, requestDescription);
1381:
1382:                // Build the list of backends that need to rollback this transaction
1383:                ArrayList rollbackList = new ArrayList();
1384:                for (int i = 0; i < nbOfThreads; i++) {
1385:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1386:                            .get(i);
1387:                    if (backend.isStartedTransaction(lTid))
1388:                        rollbackList.add(backend);
1389:                }
1390:
1391:                int nbOfThreadsToRollback = rollbackList.size();
1392:                RollbackTask task = null;
1393:                task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1394:                        nbOfThreadsToRollback, tm);
1395:
1396:                // Post the task in the non-conflicting queues.
1397:                synchronized (enabledBackends) {
1398:                    for (int i = 0; i < nbOfThreadsToRollback; i++) {
1399:                        DatabaseBackend backend = (DatabaseBackend) rollbackList
1400:                                .get(i);
1401:                        backend.getTaskQueues()
1402:                                .addTaskToBackendTotalOrderQueue(task);
1403:                    }
1404:                }
1405:
1406:                // Release the lock
1407:                backendListLock.releaseRead();
1408:
1409:                // Unblock next query from total order queue
1410:                if (totalOrderRollback != null)
1411:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1412:
1413:                // Check if someone had something to rollback
1414:                if (nbOfThreadsToRollback == 0)
1415:                    return;
1416:
1417:                synchronized (task) {
1418:                    if (!task.hasCompleted())
1419:                        waitForTaskCompletion(tm.getTimeout(),
1420:                                requestDescription, task);
1421:
1422:                    if (task.getSuccess() > 0)
1423:                        return;
1424:
1425:                    // All tasks failed
1426:                    List exceptions = task.getExceptions();
1427:                    if (exceptions == null)
1428:                        throw new SQLException(Translate.get(
1429:                                "loadbalancer.rollback.all.failed", tid));
1430:                    else {
1431:                        String errorMsg = Translate.get(
1432:                                "loadbalancer.rollback.failed.stack", tid)
1433:                                + "\n";
1434:                        SQLException ex = SQLExceptionFactory.getSQLException(
1435:                                exceptions, errorMsg);
1436:                        logger.error(ex.getMessage());
1437:                        throw ex;
1438:                    }
1439:                }
1440:            }
1441:
1442:            /**
1443:             * Rollback a transaction to a savepoint
1444:             *
1445:             * @param tm The transaction marker metadata
1446:             * @param savepointName The name of the savepoint
1447:             * @throws SQLException if an error occurs
1448:             */
1449:            public void rollbackToSavepoint(TransactionMetaData tm,
1450:                    String savepointName) throws SQLException {
1451:                long tid = tm.getTransactionId();
1452:                Long lTid = new Long(tid);
1453:
1454:                // Ordering for distributed virtual database
1455:                DistributedRollbackToSavepoint totalOrderRollback = null;
1456:                boolean canTakeReadLock = false;
1457:                if (vdb.getTotalOrderQueue() != null) {
1458:                    totalOrderRollback = new DistributedRollbackToSavepoint(
1459:                            tid, savepointName);
1460:                    // Total ordering mainly for distributed virtual databases.
1461:                    // If waitForTotalOrder returns true then the query has been scheduled in
1462:                    // total order and there is no need to take a write lock later to resolve
1463:                    // potential conflicts.
1464:                    canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1465:                            false);
1466:                    if (!canTakeReadLock)
1467:                        // This is a local commit no total order info
1468:                        totalOrderRollback = null;
1469:                }
1470:
1471:                // Update the recovery log
1472:                if (recoveryLog != null)
1473:                    recoveryLog.logRollbackToSavepoint(tm, savepointName);
1474:
1475:                // Acquire the lock
1476:                String requestDescription = "rollback " + savepointName + " "
1477:                        + tid;
1478:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1479:                        requestDescription);
1480:
1481:                // Build the list of backends that need to rollback this transaction
1482:                ArrayList rollbackList = new ArrayList();
1483:                for (int i = 0; i < nbOfThreads; i++) {
1484:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1485:                            .get(i);
1486:                    if (backend.isStartedTransaction(lTid))
1487:                        rollbackList.add(backend);
1488:                }
1489:
1490:                int nbOfThreadsToRollback = rollbackList.size();
1491:                RollbackToSavepointTask task = null;
1492:                if (nbOfThreadsToRollback != 0)
1493:                    task = new RollbackToSavepointTask(
1494:                            getNbToWait(nbOfThreadsToRollback),
1495:                            nbOfThreadsToRollback, tm, savepointName);
1496:
1497:                // Post the task in the non-conflicting queues.
1498:                synchronized (enabledBackends) {
1499:                    for (int i = 0; i < nbOfThreadsToRollback; i++) {
1500:                        DatabaseBackend backend = (DatabaseBackend) rollbackList
1501:                                .get(i);
1502:                        backend.getTaskQueues()
1503:                                .addTaskToBackendTotalOrderQueue(task);
1504:                    }
1505:                }
1506:
1507:                // Release the lock
1508:                backendListLock.releaseRead();
1509:
1510:                // Unblock next query from total order queue
1511:                if (totalOrderRollback != null)
1512:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1513:
1514:                // Check if someone had something to rollback
1515:                if (task == null)
1516:                    return;
1517:
1518:                synchronized (task) {
1519:                    if (!task.hasCompleted())
1520:                        waitForTaskCompletion(tm.getTimeout(),
1521:                                requestDescription, task);
1522:
1523:                    if (task.getSuccess() == 0) { // All tasks failed
1524:                        List exceptions = task.getExceptions();
1525:                        if (exceptions == null)
1526:                            throw new SQLException(
1527:                                    Translate
1528:                                            .get(
1529:                                                    "loadbalancer.rollbacksavepoint.all.failed",
1530:                                                    new String[] {
1531:                                                            savepointName,
1532:                                                            String.valueOf(tid) }));
1533:                        else {
1534:                            String errorMsg = Translate
1535:                                    .get(
1536:                                            "loadbalancer.rollbacksavepoint.failed.stack",
1537:                                            new String[] { savepointName,
1538:                                                    String.valueOf(tid) })
1539:                                    + "\n";
1540:                            SQLException ex = SQLExceptionFactory
1541:                                    .getSQLException(exceptions, errorMsg);
1542:                            logger.error(ex.getMessage());
1543:                            throw ex;
1544:                        }
1545:                    }
1546:                }
1547:            }
1548:
1549:            /**
1550:             * Release a savepoint from a transaction
1551:             *
1552:             * @param tm The transaction marker metadata
1553:             * @param savepointName The name of the savepoint ro release
1554:             * @throws SQLException if an error occurs
1555:             */
1556:            public void releaseSavepoint(TransactionMetaData tm,
1557:                    String savepointName) throws SQLException {
1558:                long tid = tm.getTransactionId();
1559:                Long lTid = new Long(tid);
1560:
1561:                // Ordering for distributed virtual database
1562:                DistributedReleaseSavepoint totalOrderRelease = null;
1563:                boolean canTakeReadLock = false;
1564:                if (vdb.getTotalOrderQueue() != null) {
1565:                    totalOrderRelease = new DistributedReleaseSavepoint(tid,
1566:                            savepointName);
1567:                    // Total ordering mainly for distributed virtual databases.
1568:                    // If waitForTotalOrder returns true then the query has been scheduled in
1569:                    // total order and there is no need to take a write lock later to resolve
1570:                    // potential conflicts.
1571:                    canTakeReadLock = waitForTotalOrder(totalOrderRelease,
1572:                            false);
1573:                    if (!canTakeReadLock)
1574:                        // This is a local commit no total order info
1575:                        totalOrderRelease = null;
1576:                }
1577:
1578:                // Update the recovery log
1579:                if (recoveryLog != null)
1580:                    recoveryLog.logReleaseSavepoint(tm, savepointName);
1581:
1582:                // Acquire the lock
1583:                String requestDescription = "release savepoint "
1584:                        + savepointName + " " + tid;
1585:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1586:                        requestDescription);
1587:
1588:                // Build the list of backends that need to rollback this transaction
1589:                ArrayList savepointList = new ArrayList();
1590:                for (int i = 0; i < nbOfThreads; i++) {
1591:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1592:                            .get(i);
1593:                    if (backend.isStartedTransaction(lTid))
1594:                        savepointList.add(backend);
1595:                }
1596:
1597:                int nbOfSavepoints = savepointList.size();
1598:                ReleaseSavepointTask task = null;
1599:                if (nbOfSavepoints != 0)
1600:                    task = new ReleaseSavepointTask(getNbToWait(nbOfThreads),
1601:                            nbOfThreads, tm, savepointName);
1602:
1603:                // Post the task in the non-conflicting queues.
1604:                synchronized (enabledBackends) {
1605:                    for (int i = 0; i < nbOfSavepoints; i++) {
1606:                        DatabaseBackend backend = (DatabaseBackend) savepointList
1607:                                .get(i);
1608:                        backend.getTaskQueues()
1609:                                .addTaskToBackendTotalOrderQueue(task);
1610:                    }
1611:                }
1612:
1613:                // Release the lock
1614:                backendListLock.releaseRead();
1615:
1616:                // Unblock next query from total order queue
1617:                if (totalOrderRelease != null)
1618:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease);
1619:
1620:                // Check if someone had something to release
1621:                if (task == null)
1622:                    return;
1623:
1624:                synchronized (task) {
1625:                    if (!task.hasCompleted())
1626:                        waitForTaskCompletion(tm.getTimeout(),
1627:                                requestDescription, task);
1628:
1629:                    if (task.getSuccess() == 0) { // All tasks failed
1630:                        List exceptions = task.getExceptions();
1631:                        if (exceptions == null)
1632:                            throw new SQLException(Translate.get(
1633:                                    "loadbalancer.releasesavepoint.all.failed",
1634:                                    new String[] { savepointName,
1635:                                            String.valueOf(tid) }));
1636:                        else {
1637:                            String errorMsg = Translate
1638:                                    .get(
1639:                                            "loadbalancer.releasesavepoint.failed.stack",
1640:                                            new String[] { savepointName,
1641:                                                    String.valueOf(tid) })
1642:                                    + "\n";
1643:                            SQLException ex = SQLExceptionFactory
1644:                                    .getSQLException(exceptions, errorMsg);
1645:                            logger.error(ex.getMessage());
1646:                            throw ex;
1647:                        }
1648:                    }
1649:                }
1650:            }
1651:
1652:            /**
1653:             * Set a savepoint to a transaction.
1654:             *
1655:             * @param tm The transaction marker metadata
1656:             * @param savepointName The name of the new savepoint
1657:             * @throws SQLException if an error occurs
1658:             */
1659:            public void setSavepoint(TransactionMetaData tm,
1660:                    String savepointName) throws SQLException {
1661:                long tid = tm.getTransactionId();
1662:
1663:                // Ordering for distributed virtual database
1664:                DistributedSetSavepoint totalOrderSavepoint = null;
1665:                boolean canTakeReadLock = false;
1666:                if (vdb.getTotalOrderQueue() != null) {
1667:                    totalOrderSavepoint = new DistributedSetSavepoint(tm
1668:                            .getLogin(), tid, savepointName);
1669:                    // Total ordering mainly for distributed virtual databases.
1670:                    // If waitForTotalOrder returns true then the query has been scheduled in
1671:                    // total order and there is no need to take a write lock later to resolve
1672:                    // potential conflicts.
1673:                    canTakeReadLock = waitForTotalOrder(totalOrderSavepoint,
1674:                            false);
1675:                    if (!canTakeReadLock)
1676:                        // This is a local commit no total order info
1677:                        totalOrderSavepoint = null;
1678:                }
1679:
1680:                // Update the recovery log
1681:                if (recoveryLog != null)
1682:                    recoveryLog.logSetSavepoint(tm, savepointName);
1683:
1684:                // Acquire the lock
1685:                String requestDescription = "set savepoint " + savepointName
1686:                        + " " + tid;
1687:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1688:                        requestDescription);
1689:
1690:                SavepointTask task = null;
1691:
1692:                // Post the task in the non-conflicting queues of all backends.
1693:                synchronized (enabledBackends) {
1694:                    if (nbOfThreads != 0) {
1695:                        task = new SavepointTask(getNbToWait(nbOfThreads),
1696:                                nbOfThreads, tm, savepointName);
1697:                        for (int i = 0; i < nbOfThreads; i++) {
1698:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1699:                                    .get(i);
1700:                            backend.getTaskQueues()
1701:                                    .addTaskToBackendTotalOrderQueue(task);
1702:                        }
1703:                    }
1704:                }
1705:
1706:                // Release the lock
1707:                backendListLock.releaseRead();
1708:
1709:                // Unblock next query from total order queue
1710:                if (totalOrderSavepoint != null)
1711:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint);
1712:
1713:                // Check if someone had something to release
1714:                if (task == null)
1715:                    return;
1716:
1717:                synchronized (task) {
1718:                    if (!task.hasCompleted())
1719:                        waitForTaskCompletion(tm.getTimeout(),
1720:                                requestDescription, task);
1721:
1722:                    if (task.getSuccess() == 0) { // All tasks failed
1723:                        List exceptions = task.getExceptions();
1724:                        if (exceptions == null)
1725:                            throw new SQLException(Translate.get(
1726:                                    "loadbalancer.setsavepoint.all.failed",
1727:                                    new String[] { savepointName,
1728:                                            String.valueOf(tid) }));
1729:                        else {
1730:                            String errorMsg = Translate.get(
1731:                                    "loadbalancer.setsavepoint.failed.stack",
1732:                                    new String[] { savepointName,
1733:                                            String.valueOf(tid) })
1734:                                    + "\n";
1735:                            SQLException ex = SQLExceptionFactory
1736:                                    .getSQLException(exceptions, errorMsg);
1737:                            logger.error(ex.getMessage());
1738:                            throw ex;
1739:                        }
1740:                    }
1741:                }
1742:            }
1743:
1744:            //
1745:            // Utility functions
1746:            //
1747:
1748:            /**
1749:             * Check in which queue the task should be posted and atomically posts the
1750:             * task in the queue of all backends. The list of locks acquired by the
1751:             * request is set on the task if the request is in autocommit mode (if it is
1752:             * in a transaction it is automatically added to the transaction lock list).
1753:             * The list of lock can be null if no lock has been acquired.
1754:             *
1755:             * @param task the task to post
1756:             * @param nbOfThreads number of threads in the backend list (must already be
1757:             *          locked)
1758:             * @param removeFromTotalOrderQueue true if the query must be removed from the
1759:             *          total order queue
1760:             */
1761:            private void atomicTaskPostInQueueAndReleaseLock(
1762:                    AbstractRequest request, AbstractTask task,
1763:                    int nbOfThreads, boolean removeFromTotalOrderQueue) {
1764:                synchronized (enabledBackends) {
1765:                    for (int i = 0; i < nbOfThreads; i++) {
1766:                        BackendTaskQueues queues = ((DatabaseBackend) enabledBackends
1767:                                .get(i)).getTaskQueues();
1768:                        queues.addTaskToBackendTotalOrderQueue(task);
1769:                    }
1770:                }
1771:
1772:                backendListLock.releaseRead();
1773:
1774:                // Unblock next query from total order queue
1775:                if (removeFromTotalOrderQueue) {
1776:                    removeObjectFromAndNotifyTotalOrderQueue(request);
1777:                }
1778:            }
1779:
1780:            /**
1781:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1782:             *      long)
1783:             */
1784:            public void closePersistentConnection(String login,
1785:                    long persistentConnectionId) throws SQLException {
1786:                /*
1787:                 * We assume a synchronous execution and connection closing can only come
1788:                 * after all requests have been executed in that connection. We post to all
1789:                 * backends and let the task deal with whether that backend had a persistent
1790:                 * connection or not.
1791:                 */
1792:
1793:                String requestDescription = "closing persistent connection "
1794:                        + persistentConnectionId;
1795:                int nbOfThreads = 0;
1796:
1797:                DistributedClosePersistentConnection totalOrderQueueObject = null;
1798:                boolean removefromTotalOrder = false;
1799:                if (vdb.getTotalOrderQueue() != null) {
1800:                    totalOrderQueueObject = new DistributedClosePersistentConnection(
1801:                            login, persistentConnectionId);
1802:                    removefromTotalOrder = waitForTotalOrder(
1803:                            totalOrderQueueObject, false);
1804:                }
1805:
1806:                ClosePersistentConnectionTask task = null;
1807:                try {
1808:                    nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1809:                            requestDescription);
1810:
1811:                    task = new ClosePersistentConnectionTask(
1812:                            getNbToWait(nbOfThreads), nbOfThreads, login,
1813:                            persistentConnectionId);
1814:
1815:                    // Post the task in the non-conflicting queues.
1816:                    synchronized (enabledBackends) {
1817:                        for (int i = 0; i < nbOfThreads; i++) {
1818:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1819:                                    .get(i);
1820:                            backend.getTaskQueues()
1821:                                    .addTaskToBackendTotalOrderQueue(task);
1822:                        }
1823:                    }
1824:
1825:                    // Release the lock
1826:                    backendListLock.releaseRead();
1827:
1828:                    if (removefromTotalOrder)
1829:                        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1830:                    totalOrderQueueObject = null;
1831:
1832:                    synchronized (task) {
1833:                        if (!task.hasCompleted())
1834:                            try {
1835:                                waitForTaskCompletion(0, requestDescription,
1836:                                        task);
1837:                            } catch (SQLException ignore) {
1838:                            }
1839:                    }
1840:                } finally {
1841:                    if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1842:                        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1843:                    }
1844:
1845:                    if (logger.isDebugEnabled())
1846:                        logger.debug(requestDescription + " completed on "
1847:                                + nbOfThreads + " backends.");
1848:                }
1849:            }
1850:
1851:            /**
1852:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1853:             *      long)
1854:             */
1855:            public void openPersistentConnection(String login,
1856:                    long persistentConnectionId) throws SQLException {
1857:                String requestDescription = "opening persistent connection "
1858:                        + persistentConnectionId;
1859:                int nbOfThreads = 0;
1860:
1861:                DistributedOpenPersistentConnection totalOrderQueueObject = null;
1862:                if (vdb.getTotalOrderQueue() != null) {
1863:                    totalOrderQueueObject = new DistributedOpenPersistentConnection(
1864:                            login, persistentConnectionId);
1865:                    waitForTotalOrder(totalOrderQueueObject, true);
1866:                }
1867:
1868:                OpenPersistentConnectionTask task = null;
1869:                try {
1870:                    nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1871:                            requestDescription);
1872:
1873:                    task = new OpenPersistentConnectionTask(
1874:                            getNbToWait(nbOfThreads), nbOfThreads, login,
1875:                            persistentConnectionId);
1876:
1877:                    // Post the task in the non-conflicting queues.
1878:                    synchronized (enabledBackends) {
1879:                        for (int i = 0; i < nbOfThreads; i++) {
1880:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1881:                                    .get(i);
1882:                            backend.getTaskQueues()
1883:                                    .addTaskToBackendTotalOrderQueue(task);
1884:                        }
1885:                    }
1886:
1887:                    // Release the lock
1888:                    backendListLock.releaseRead();
1889:
1890:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1891:                    totalOrderQueueObject = null;
1892:
1893:                    synchronized (task) {
1894:                        if (!task.hasCompleted())
1895:                            try {
1896:                                waitForTaskCompletion(0, requestDescription,
1897:                                        task);
1898:                            } catch (SQLException ignore) {
1899:                            }
1900:                    }
1901:                } finally {
1902:                    if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1903:                        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1904:                    }
1905:
1906:                    if (logger.isDebugEnabled())
1907:                        logger.debug(requestDescription + " completed on "
1908:                                + nbOfThreads + " backends.");
1909:                }
1910:            }
1911:
1912:            /**
1913:             * Enables a Backend that was previously disabled.
1914:             * <p>
1915:             * Ask the corresponding connection manager to initialize the connections if
1916:             * needed.
1917:             * <p>
1918:             * No sanity checks are performed by this function.
1919:             *
1920:             * @param db the database backend to enable
1921:             * @param writeEnabled True if the backend must be enabled for writes
1922:             * @throws SQLException if an error occurs
1923:             */
1924:            public synchronized void enableBackend(DatabaseBackend db,
1925:                    boolean writeEnabled) throws SQLException {
1926:                if (!db.isInitialized())
1927:                    db.initializeConnections();
1928:
1929:                if (writeEnabled && db.isWriteCanBeEnabled()) {
1930:                    BackendTaskQueues taskqueues = new BackendTaskQueues(db,
1931:                            waitForCompletionPolicy, this .vdb
1932:                                    .getRequestManager());
1933:                    // Create the new backend task queues
1934:                    try {
1935:                        ObjectName taskQueuesObjectName = JmxConstants
1936:                                .getBackendTaskQueuesObjectName(db
1937:                                        .getVirtualDatabaseName(), db.getName());
1938:                        if (MBeanServerManager.getInstance().isRegistered(
1939:                                taskQueuesObjectName)) {
1940:                            MBeanServerManager.unregister(taskQueuesObjectName);
1941:                        }
1942:                        MBeanServerManager.registerMBean(
1943:                                new BackendTaskQueuesControl(taskqueues),
1944:                                taskQueuesObjectName);
1945:                    } catch (Exception e) {
1946:                        if (logger.isWarnEnabled()) {
1947:                            logger.warn(
1948:                                    "failed to register task queue mbeans for "
1949:                                            + db, e);
1950:                        }
1951:                    }
1952:                    db.setTaskQueues(taskqueues);
1953:                    db.startWorkerThreads(this );
1954:                    db.startDeadlockDetectionThread(this .vdb);
1955:                    db.enableWrite();
1956:                }
1957:
1958:                db.enableRead();
1959:                try {
1960:                    backendListLock.acquireWrite();
1961:                } catch (InterruptedException e) {
1962:                    logger
1963:                            .error(
1964:                                    "Error while acquiring write lock in enableBackend",
1965:                                    e);
1966:                }
1967:
1968:                synchronized (enabledBackends) {
1969:                    enabledBackends.add(db);
1970:                }
1971:
1972:                backendListLock.releaseWrite();
1973:            }
1974:
1975:            /**
1976:             * Disables a backend that was previously enabled.
1977:             * <p>
1978:             * Ask the corresponding connection manager to finalize the connections if
1979:             * needed.
1980:             * <p>
1981:             * No sanity checks are performed by this function.
1982:             *
1983:             * @param db the database backend to disable
1984:             * @param forceDisable true if disabling must be forced on the backend
1985:             * @throws SQLException if an error occurs
1986:             */
1987:            public void disableBackend(DatabaseBackend db, boolean forceDisable)
1988:                    throws SQLException {
1989:                if (!db.disable()) {
1990:                    // Another thread has already started the disable process
1991:                    return;
1992:                }
1993:                synchronized (this ) {
1994:                    try {
1995:                        backendListLock.acquireWrite();
1996:                    } catch (InterruptedException e) {
1997:                        logger
1998:                                .error(
1999:                                        "Error while acquiring write lock in enableBackend",
2000:                                        e);
2001:                    }
2002:
2003:                    try {
2004:                        synchronized (enabledBackends) {
2005:                            enabledBackends.remove(db);
2006:                            if (enabledBackends.isEmpty()) {
2007:                                // Cleanup schema for any remaining locks
2008:                                this .vdb.getRequestManager().setDatabaseSchema(
2009:                                        null, false);
2010:                            }
2011:                        }
2012:
2013:                        if (!forceDisable)
2014:                            terminateThreadsAndConnections(db);
2015:                    } finally {
2016:                        backendListLock.releaseWrite();
2017:                    }
2018:
2019:                    if (forceDisable) {
2020:                        db.shutdownConnectionManagers();
2021:                        terminateThreadsAndConnections(db, false);
2022:                    }
2023:
2024:                    // sanity check on backend's active transaction
2025:                    if (!db.getActiveTransactions().isEmpty()) {
2026:                        if (logger.isWarnEnabled()) {
2027:                            logger.warn("Active transactions after backend "
2028:                                    + db.getName() + " is disabled: "
2029:                                    + db.getActiveTransactions());
2030:                        }
2031:                    }
2032:                }
2033:            }
2034:
2035:            private void terminateThreadsAndConnections(DatabaseBackend db)
2036:                    throws SQLException {
2037:                terminateThreadsAndConnections(db, true);
2038:            }
2039:
2040:            private void terminateThreadsAndConnections(DatabaseBackend db,
2041:                    boolean wait) throws SQLException {
2042:                db.terminateWorkerThreads(wait);
2043:                db.terminateDeadlockDetectionThread();
2044:
2045:                if (db.isInitialized())
2046:                    db.finalizeConnections();
2047:            }
2048:
2049:            //
2050:            // Debug/Monitoring
2051:            //
2052:
2053:            /**
2054:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2055:             */
2056:            public String getXmlImpl() {
2057:                StringBuffer info = new StringBuffer();
2058:                info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2059:                if (waitForCompletionPolicy != null)
2060:                    info.append(waitForCompletionPolicy.getXml());
2061:                if (macroHandler != null)
2062:                    info.append(macroHandler.getXml());
2063:                info.append(getRaidb1Xml());
2064:                info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2065:                return info.toString();
2066:            }
2067:
2068:            /**
2069:             * Surrounding raidb1 tags can be treated by <method>getXmlImpl </method>
2070:             * above, but more detailed content have to be returned by the method
2071:             * <method>getRaidb1Xml </method> below.
2072:             *
2073:             * @return content of Raidb1 xml
2074:             */
2075:            public abstract String getRaidb1Xml();
2076:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.