Source Code Cross Referenced for RAIDb2.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » loadbalancer » raidb2 » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.loadbalancer.raidb2 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Contact: sequoia@continuent.org
0007:         * 
0008:         * Licensed under the Apache License, Version 2.0 (the "License");
0009:         * you may not use this file except in compliance with the License.
0010:         * You may obtain a copy of the License at
0011:         * 
0012:         * http://www.apache.org/licenses/LICENSE-2.0
0013:         * 
0014:         * Unless required by applicable law or agreed to in writing, software
0015:         * distributed under the License is distributed on an "AS IS" BASIS,
0016:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017:         * See the License for the specific language governing permissions and
0018:         * limitations under the License. 
0019:         *
0020:         * Initial developer(s): Emmanuel Cecchet. 
0021:         * Contributor(s): Jean-Bernard van Zuylen.
0022:         */package org.continuent.sequoia.controller.loadbalancer.raidb2;
0023:
0024:        import java.sql.Connection;
0025:        import java.sql.SQLException;
0026:        import java.util.ArrayList;
0027:        import java.util.List;
0028:
0029:        import org.continuent.sequoia.common.exceptions.BadConnectionException;
0030:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0031:        import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0032:        import org.continuent.sequoia.common.exceptions.NotImplementedException;
0033:        import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0034:        import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0035:        import org.continuent.sequoia.common.i18n.Translate;
0036:        import org.continuent.sequoia.common.log.Trace;
0037:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0038:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0039:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0040:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0041:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0042:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0043:        import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0044:        import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0045:        import org.continuent.sequoia.controller.connection.PooledConnection;
0046:        import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0047:        import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0048:        import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0049:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0050:        import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException;
0051:        import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
0052:        import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule;
0053:        import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0054:        import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask;
0055:        import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
0056:        import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask;
0057:        import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0058:        import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0059:        import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0060:        import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0061:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0062:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0063:        import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0064:        import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
0065:        import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask;
0066:        import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0067:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0068:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0069:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0070:        import org.continuent.sequoia.controller.requests.ParsingGranularities;
0071:        import org.continuent.sequoia.controller.requests.SelectRequest;
0072:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0073:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0074:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
0075:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0076:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0077:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0078:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0079:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0080:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0081:
0082:        /**
0083:         * RAIDb-2 load balancer.
0084:         * <p>
0085:         * This class is an abstract call because the read requests coming from the
0086:         * Request Manager are NOT treated here but in the subclasses. Transaction
0087:         * management and write requests are broadcasted to all backends owning the
0088:         * written table.
0089:         * 
0090:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0091:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0092:         *         </a>
0093:         * @version 1.0
0094:         */
0095:        public abstract class RAIDb2 extends AbstractLoadBalancer {
0096:            //
0097:            // How the code is organized ?
0098:            // 1. Member variables
0099:            // 2. Constructor(s)
0100:            // 3. Request handling
0101:            // 4. Transaction handling
0102:            // 5. Backend management
0103:            //
0104:
0105:            protected CreateTablePolicy createTablePolicy;
0106:            protected static Trace logger = Trace
0107:                    .getLogger("org.continuent.sequoia.controller.loadbalancer.raidb2");
0108:
0109:            /*
0110:             * Constructors
0111:             */
0112:
0113:            /**
0114:             * Creates a new RAIDb-2 request load balancer. A new backend worker thread is
0115:             * created for each backend.
0116:             * 
0117:             * @param vdb the virtual database this load balancer belongs to.
0118:             * @param waitForCompletionPolicy how many backends must complete before
0119:             *          returning the result ?
0120:             * @param createTablePolicy the policy defining how 'create table' statements
0121:             *          should be handled
0122:             * @exception Exception if an error occurs
0123:             */
0124:            public RAIDb2(VirtualDatabase vdb,
0125:                    WaitForCompletionPolicy waitForCompletionPolicy,
0126:                    CreateTablePolicy createTablePolicy) throws Exception {
0127:                super (vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
0128:
0129:                this .waitForCompletionPolicy = waitForCompletionPolicy;
0130:                this .createTablePolicy = createTablePolicy;
0131:            }
0132:
0133:            /*
0134:             * Request Handling
0135:             */
0136:
0137:            /**
0138:             * Implementation specific load balanced read execution.
0139:             * 
0140:             * @param request an <code>SelectRequest</code>
0141:             * @param metadataCache the metadataCache if any or null
0142:             * @return the corresponding <code>java.sql.ResultSet</code>
0143:             * @exception SQLException if an error occurs
0144:             */
0145:            public abstract ControllerResultSet statementExecuteQuery(
0146:                    SelectRequest request, MetadataCache metadataCache)
0147:                    throws SQLException;
0148:
0149:            /**
0150:             * Performs a write request. This request is broadcasted to all nodes that
0151:             * owns the table to be written.
0152:             * 
0153:             * @param request an <code>AbstractWriteRequest</code>
0154:             * @return number of rows affected by the request
0155:             * @throws AllBackendsFailedException if all backends failed to execute the
0156:             *           request
0157:             * @exception SQLException if an error occurs
0158:             */
0159:            public ExecuteUpdateResult statementExecuteUpdate(
0160:                    AbstractWriteRequest request)
0161:                    throws AllBackendsFailedException, SQLException {
0162:                return ((StatementExecuteUpdateTask) execWriteRequest(request,
0163:                        false, null)).getResult();
0164:            }
0165:
0166:            /**
0167:             * Perform a write request and return the auto generated keys.
0168:             * 
0169:             * @param request the request to execute
0170:             * @param metadataCache the metadataCache if any or null
0171:             * @return update count and auto generated keys.
0172:             * @throws AllBackendsFailedException if all backends failed to execute the
0173:             *           request
0174:             * @exception SQLException if an error occurs
0175:             */
0176:            public GeneratedKeysResult statementExecuteUpdateWithKeys(
0177:                    AbstractWriteRequest request, MetadataCache metadataCache)
0178:                    throws AllBackendsFailedException, SQLException {
0179:                return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(
0180:                        request, true, metadataCache)).getResult();
0181:            }
0182:
0183:            /**
0184:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0185:             *      MetadataCache)
0186:             */
0187:            public ExecuteResult statementExecute(AbstractRequest request,
0188:                    MetadataCache metadataCache) throws SQLException,
0189:                    AllBackendsFailedException {
0190:                throw new NotImplementedException(
0191:                        "Statement.execute() is currently not supported with RAIDb-2");
0192:            }
0193:
0194:            /**
0195:             * Common code for execWriteRequest(AbstractWriteRequest) and
0196:             * execWriteRequestWithKeys(AbstractWriteRequest).
0197:             * <p>
0198:             * Note that macros are processed here.
0199:             * <p>
0200:             * The result is given back in AbstractTask.getResult().
0201:             * 
0202:             * @param request the request to execute
0203:             * @param useKeys true if this must give an auto generated keys ResultSet
0204:             * @param metadataCache the metadataCache if any or null
0205:             * @throws AllBackendsFailedException if all backends failed to execute the
0206:             *           request
0207:             * @throws SQLException if an error occurs
0208:             */
0209:            private AbstractTask execWriteRequest(AbstractWriteRequest request,
0210:                    boolean useKeys, MetadataCache metadataCache)
0211:                    throws AllBackendsFailedException, SQLException {
0212:                // Handle macros
0213:                handleMacros(request);
0214:
0215:                // Total ordering mainly for distributed virtual databases.
0216:                boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0217:                        true);
0218:
0219:                // Log lazy begin if needed
0220:                if (request.isLazyTransactionStart())
0221:                    this .vdb.getRequestManager().logLazyTransactionBegin(
0222:                            request.getTransactionId());
0223:
0224:                // Log request
0225:                recoveryLog.logRequestExecuting(request);
0226:
0227:                int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0228:                        String.valueOf(request.getId()));
0229:
0230:                List writeList = new ArrayList();
0231:
0232:                if (request.isCreate()) {
0233:                    try {
0234:                        writeList = getBackendsForCreateTableRequest(request
0235:                                .getTableName());
0236:                    } catch (CreateTableException e) {
0237:                        releaseLockAndUnlockNextQuery(request);
0238:                        throw new SQLException(Translate.get(
0239:                                "loadbalancer.create.table.rule.failed", e
0240:                                        .getMessage()));
0241:                    }
0242:                } else {
0243:                    writeList = getBackendsWithTable(request.getTableName(),
0244:                            nbOfThreads);
0245:                }
0246:
0247:                nbOfThreads = writeList.size();
0248:                if (nbOfThreads == 0) {
0249:                    String msg = Translate.get(
0250:                            "loadbalancer.execute.no.backend.found", request
0251:                                    .getSqlShortForm(vdb
0252:                                            .getSqlShortFormLength()));
0253:                    logger.warn(msg);
0254:
0255:                    releaseLockAndUnlockNextQuery(request);
0256:                    throw new SQLException(msg);
0257:                }
0258:                if (logger.isDebugEnabled()) {
0259:                    logger.debug(Translate.get(
0260:                            "loadbalancer.execute.on.several", String
0261:                                    .valueOf(request.getId()), String
0262:                                    .valueOf(nbOfThreads)));
0263:                }
0264:
0265:                // Create the task
0266:                AbstractTask task;
0267:                if (useKeys)
0268:                    task = new StatementExecuteUpdateWithKeysTask(
0269:                            getNbToWait(nbOfThreads), nbOfThreads, request,
0270:                            metadataCache);
0271:                else
0272:                    task = new StatementExecuteUpdateTask(
0273:                            getNbToWait(nbOfThreads), nbOfThreads, request);
0274:
0275:                atomicTaskPostInQueueAndReleaseLock(request, task, writeList,
0276:                        removeFromTotalOrderQueue);
0277:
0278:                synchronized (task) {
0279:                    if (!task.hasCompleted())
0280:                        waitForTaskCompletion(request.getTimeout() * 1000L,
0281:                                String.valueOf(request.getId()), task);
0282:
0283:                    checkTaskCompletion(task);
0284:                    return task;
0285:                }
0286:            }
0287:
0288:            /**
0289:             * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends containing
0290:             * the table identified by <code>tableName</code>.
0291:             * 
0292:             * @param tableName name of the table
0293:             * @param nbOfThreads number of threads
0294:             * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends
0295:             *         containing the table identified by <code>tableName</code>
0296:             */
0297:            // FIXME why don't we just iterate on the enabledBackends list (assuming we
0298:            // acquire/release the read lock)?
0299:            // TODO should be moved to VirtualDatabase
0300:            private List getBackendsWithTable(String tableName, int nbOfThreads) {
0301:                List backendsWithTable = new ArrayList();
0302:                for (int i = 0; i < nbOfThreads; i++) {
0303:                    DatabaseBackend b = (DatabaseBackend) enabledBackends
0304:                            .get(i);
0305:                    if (b.hasTable(tableName))
0306:                        backendsWithTable.add(b);
0307:                }
0308:                return backendsWithTable;
0309:            }
0310:
0311:            /**
0312:             * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends with a
0313:             * transaction identified by <code>tid</code> already started.
0314:             * 
0315:             * @param tid Transaction identifier
0316:             * @param nbOfThreads number of threads
0317:             * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends with a
0318:             *         transaction identified by <code>tid</code> already started
0319:             */
0320:            // FIXME why don't we just iterate on the enabledBackends list (assuming we
0321:            // acquire/release the read lock)?
0322:            // TODO should be moved to VirtualDatabase
0323:            private List getBackendsWithStartedTransaction(Long tid,
0324:                    int nbOfThreads) {
0325:                // Build the list of backends that need to commit this transaction
0326:                List backendsWithStartedTransaction = new ArrayList(nbOfThreads);
0327:                for (int i = 0; i < nbOfThreads; i++) {
0328:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
0329:                            .get(i);
0330:                    if (backend.isStartedTransaction(tid))
0331:                        backendsWithStartedTransaction.add(backend);
0332:                }
0333:                return backendsWithStartedTransaction;
0334:            }
0335:
0336:            /**
0337:             * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends containing
0338:             * the stored procedure identified by <code>procedureName</code>.
0339:             * 
0340:             * @param procedureName name of the stored procedure
0341:             * @param nbOfParameters number of parameters of the stored procedure
0342:             * @param nbOfThreads number of threads
0343:             * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends
0344:             *         containing the table identified by <code>tableName</code>
0345:             */
0346:            // FIXME why don't we just iterate on the enabledBackends list (assuming we
0347:            // acquire/release the read lock)?
0348:            // TODO should be moved to VirtualDatabase
0349:            private List getBackendsWithStoredProcedure(String procedureName,
0350:                    int nbOfParameters, int nbOfThreads) {
0351:                List backendsWithStoredProcedure = new ArrayList(nbOfThreads);
0352:                for (int i = 0; i < nbOfThreads; i++) {
0353:                    DatabaseBackend b = (DatabaseBackend) enabledBackends
0354:                            .get(i);
0355:                    if (b.hasStoredProcedure(procedureName, nbOfParameters))
0356:                        backendsWithStoredProcedure.add(b);
0357:                }
0358:                return backendsWithStoredProcedure;
0359:            }
0360:
0361:            /**
0362:             * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends according
0363:             * to the create table policy.
0364:             * 
0365:             * @param tableName the name of the table to create
0366:             * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends
0367:             *         according to the create table policy.
0368:             * @throws CreateTableException if an error occurs while getting the backends
0369:             *           according to the create table policy
0370:             */
0371:            // TODO should be moved to VirtualDatabase
0372:            private List getBackendsForCreateTableRequest(String tableName)
0373:                    throws CreateTableException {
0374:                // Choose the backend according to the defined policy
0375:                CreateTableRule rule = createTablePolicy
0376:                        .getTableRule(tableName);
0377:                if (rule == null) {
0378:                    rule = createTablePolicy.getDefaultRule();
0379:                }
0380:                return rule.getBackends(vdb.getBackends());
0381:            }
0382:
0383:            /**
0384:             * Executes a select request <em>within a transation</em> on a given
0385:             * backend. The transaction is lazily begun if it was not already started.
0386:             * 
0387:             * @param request the Select request to execute
0388:             * @param backend the backend on which the request must be executed
0389:             * @param metadataCache the metadata cache
0390:             * @return a <code>ControllerResultSet</code>
0391:             * @throws SQLException if an error occurs while executing the request
0392:             */
0393:            private ControllerResultSet executeSelectRequestInTransaction(
0394:                    SelectRequest request, DatabaseBackend backend,
0395:                    MetadataCache metadataCache) throws SQLException {
0396:                AbstractConnectionManager cm = backend
0397:                        .getConnectionManager(request.getLogin());
0398:
0399:                if (cm == null) {
0400:                    String msg = Translate.get(
0401:                            "loadbalancer.connectionmanager.not.found", request
0402:                                    .getLogin(), backend.getName());
0403:                    logger.error(msg);
0404:                    throw new SQLException(msg);
0405:                }
0406:
0407:                Connection c;
0408:                long tid = request.getTransactionId();
0409:
0410:                try {
0411:                    c = backend
0412:                            .getConnectionForTransactionAndLazyBeginIfNeeded(
0413:                                    request, cm);
0414:                } catch (UnreachableBackendException e1) {
0415:                    logger.error(Translate.get(
0416:                            "loadbalancer.backend.disabling.unreachable",
0417:                            backend.getName()));
0418:                    disableBackend(backend, true);
0419:                    throw new SQLException(Translate.get(
0420:                            "loadbalancer.backend.unreacheable", backend
0421:                                    .getName()));
0422:                } catch (NoTransactionStartWhenDisablingException e) {
0423:                    String msg = Translate.get(
0424:                            "loadbalancer.backend.is.disabling", request
0425:                                    .getSqlShortForm(vdb
0426:                                            .getSqlShortFormLength()), backend
0427:                                    .getName());
0428:                    logger.error(msg);
0429:                    throw new SQLException(msg);
0430:                }
0431:
0432:                if (c == null)
0433:                    throw new SQLException(Translate.get(
0434:                            "loadbalancer.unable.retrieve.connection", String
0435:                                    .valueOf(tid), backend.getName()));
0436:
0437:                try {
0438:                    ControllerResultSet rs = executeStatementExecuteQueryOnBackend(
0439:                            request, backend, null, c, metadataCache);
0440:                    if (logger.isDebugEnabled()) {
0441:                        logger.debug(Translate.get(
0442:                                "loadbalancer.execute.transaction.on",
0443:                                new String[] { String.valueOf(tid),
0444:                                        String.valueOf(request.getId()),
0445:                                        backend.getName() }));
0446:                    }
0447:                    return rs;
0448:                } catch (SQLException e) {
0449:                    throw e;
0450:                } catch (BadConnectionException e) { // Connection failed, so did the transaction
0451:                    // Disable the backend.
0452:                    cm.deleteConnection(tid);
0453:                    String msg = Translate
0454:                            .get(
0455:                                    "loadbalancer.backend.disabling.connection.failure",
0456:                                    backend.getName());
0457:                    logger.error(msg);
0458:                    disableBackend(backend, true);
0459:                    throw new SQLException(msg);
0460:                } catch (Throwable e) {
0461:                    throw new SQLException(Translate.get(
0462:                            "loadbalancer.request.failed.on.backend",
0463:                            new String[] {
0464:                                    request.getSqlShortForm(vdb
0465:                                            .getSqlShortFormLength()),
0466:                                    backend.getName(), e.getMessage() }));
0467:                }
0468:            }
0469:
0470:            /**
0471:             * Executes a select request <em>in autocommit mode</em> on a given backend.
0472:             * 
0473:             * @param request the Select request to execute
0474:             * @param backend the backend on which the request must be executed
0475:             * @param metadataCache the metadata cache
0476:             * @return a <code>ControllerResultSet</code>
0477:             * @throws SQLException if an error occurs while executing the request
0478:             * @throws UnreachableBackendException if the backend is not reachable
0479:             */
0480:            private ControllerResultSet executeSelectRequestInAutoCommit(
0481:                    SelectRequest request, DatabaseBackend backend,
0482:                    MetadataCache metadataCache) throws SQLException,
0483:                    UnreachableBackendException {
0484:                AbstractConnectionManager cm = backend
0485:                        .getConnectionManager(request.getLogin());
0486:
0487:                // Sanity check
0488:                if (cm == null) {
0489:                    String msg = Translate.get(
0490:                            "loadbalancer.connectionmanager.not.found", request
0491:                                    .getLogin(), backend.getName());
0492:                    logger.error(msg);
0493:                    throw new SQLException(msg);
0494:                }
0495:
0496:                ControllerResultSet rs = null;
0497:                boolean badConnection;
0498:                do {
0499:                    badConnection = false;
0500:                    // Use a connection just for this request
0501:                    PooledConnection c = null;
0502:                    try {
0503:                        c = cm.retrieveConnectionInAutoCommit(request);
0504:                    } catch (UnreachableBackendException e1) {
0505:                        logger.error(Translate.get(
0506:                                "loadbalancer.backend.disabling.unreachable",
0507:                                backend.getName()));
0508:                        disableBackend(backend, true);
0509:                        throw new UnreachableBackendException(Translate.get(
0510:                                "loadbalancer.backend.unreacheable", backend
0511:                                        .getName()));
0512:                    }
0513:
0514:                    // Sanity check
0515:                    if (c == null)
0516:                        throw new UnreachableBackendException(
0517:                                "No more connections on backend "
0518:                                        + backend.getName());
0519:
0520:                    // Execute Query
0521:                    try {
0522:                        rs = executeStatementExecuteQueryOnBackend(request,
0523:                                backend, null, c.getConnection(), metadataCache);
0524:                        cm.releaseConnectionInAutoCommit(request, c);
0525:                    } catch (SQLException e) {
0526:                        cm.releaseConnectionInAutoCommit(request, c);
0527:                        throw new SQLException(Translate.get(
0528:                                "loadbalancer.request.failed.on.backend",
0529:                                new String[] {
0530:                                        request.getSqlShortForm(vdb
0531:                                                .getSqlShortFormLength()),
0532:                                        backend.getName(), e.getMessage() }));
0533:                    } catch (BadConnectionException e) { // Get rid of the bad connection
0534:                        cm.deleteConnection(c);
0535:                        badConnection = true;
0536:                    } catch (Throwable e) {
0537:                        throw new SQLException(Translate.get(
0538:                                "loadbalancer.request.failed.on.backend",
0539:                                new String[] {
0540:                                        request.getSqlShortForm(vdb
0541:                                                .getSqlShortFormLength()),
0542:                                        backend.getName(), e.getMessage() }));
0543:                    }
0544:                } while (badConnection);
0545:                if (logger.isDebugEnabled())
0546:                    logger
0547:                            .debug(Translate.get("loadbalancer.execute.on",
0548:                                    String.valueOf(request.getId()), backend
0549:                                            .getName()));
0550:                return rs;
0551:            }
0552:
0553:            /**
0554:             * Execute a read request on the selected backend.
0555:             * 
0556:             * @param request the request to execute
0557:             * @param backend the backend that will execute the request
0558:             * @param metadataCache a metadataCache if any or null
0559:             * @return the ResultSet
0560:             * @throws SQLException if an error occurs
0561:             */
0562:            protected ControllerResultSet executeReadRequestOnBackend(
0563:                    SelectRequest request, DatabaseBackend backend,
0564:                    MetadataCache metadataCache) throws SQLException,
0565:                    UnreachableBackendException {
0566:                // Handle macros
0567:                handleMacros(request);
0568:
0569:                // Execute the query
0570:                if (request.isAutoCommit()) {
0571:                    return executeSelectRequestInAutoCommit(request, backend,
0572:                            metadataCache);
0573:                } else {
0574:                    return executeSelectRequestInTransaction(request, backend,
0575:                            metadataCache);
0576:                }
0577:            }
0578:
0579:            protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1;
0580:            protected static final int CALLABLE_STATEMENT_EXECUTE = 2;
0581:
0582:            /**
0583:             * Execute a stored procedure on the selected backend.
0584:             * 
0585:             * @param proc the stored procedure to execute
0586:             * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0587:             *          false if we must call CallableStatement.execute()
0588:             * @param backend the backend that will execute the request
0589:             * @param metadataCache the metadataCache if any or null
0590:             * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0591:             *         <code>ExecuteResult</code> object otherwise
0592:             * @throws SQLException if an error occurs
0593:             */
0594:            protected Object executeStoredProcedureOnBackend(
0595:                    StoredProcedure proc, boolean isExecuteQuery,
0596:                    DatabaseBackend backend, MetadataCache metadataCache)
0597:                    throws SQLException, UnreachableBackendException {
0598:                // Execute the query
0599:                if (proc.isAutoCommit()) {
0600:                    return executeStoredProcedureInAutoCommit(proc,
0601:                            isExecuteQuery, backend, metadataCache);
0602:                } else {
0603:                    return executeStoredProcedureInTransaction(proc,
0604:                            isExecuteQuery, backend, metadataCache);
0605:                }
0606:            }
0607:
0608:            /**
0609:             * Executes a stored procedure <em>within a transation</em> on the selected
0610:             * backend. The transaction is lazily started if it has not already begun.
0611:             * 
0612:             * @param proc the <em>autocommit</em> stored procedure to execute
0613:             * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0614:             *          false if we must call CallableStatement.execute()
0615:             * @param backend the backend that will execute the request
0616:             * @param metadataCache the metadataCache if any or null
0617:             * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0618:             *         <code>ExecuteResult</code> object otherwise
0619:             * @throws SQLException if an error occurs
0620:             */
0621:            private Object executeStoredProcedureInTransaction(
0622:                    StoredProcedure proc, boolean isExecuteQuery,
0623:                    DatabaseBackend backend, MetadataCache metadataCache)
0624:                    throws SQLException {
0625:                // Inside a transaction
0626:                // Ok, we have a backend, let's execute the request
0627:                AbstractConnectionManager cm = backend
0628:                        .getConnectionManager(proc.getLogin());
0629:
0630:                // Sanity check
0631:                if (cm == null) {
0632:                    String msg = Translate.get(
0633:                            "loadbalancer.connectionmanager.not.found", proc
0634:                                    .getLogin(), backend.getName());
0635:                    logger.error(msg);
0636:                    throw new SQLException(msg);
0637:                }
0638:
0639:                Connection c;
0640:                long tid = proc.getTransactionId();
0641:
0642:                try {
0643:                    c = backend
0644:                            .getConnectionForTransactionAndLazyBeginIfNeeded(
0645:                                    proc, cm);
0646:                } catch (UnreachableBackendException e1) {
0647:                    logger.error(Translate.get(
0648:                            "loadbalancer.backend.disabling.unreachable",
0649:                            backend.getName()));
0650:                    disableBackend(backend, true);
0651:                    throw new SQLException(Translate.get(
0652:                            "loadbalancer.backend.unreacheable", backend
0653:                                    .getName()));
0654:                } catch (NoTransactionStartWhenDisablingException e) {
0655:                    String msg = Translate.get(
0656:                            "loadbalancer.backend.is.disabling", proc
0657:                                    .getSqlShortForm(vdb
0658:                                            .getSqlShortFormLength()), backend
0659:                                    .getName());
0660:                    logger.error(msg);
0661:                    throw new SQLException(msg);
0662:                }
0663:
0664:                // Sanity check
0665:                if (c == null)
0666:                    throw new SQLException(Translate.get(
0667:                            "loadbalancer.unable.retrieve.connection", String
0668:                                    .valueOf(tid), backend.getName()));
0669:
0670:                // Execute Query
0671:                try {
0672:                    if (isExecuteQuery)
0673:                        return AbstractLoadBalancer
0674:                                .executeCallableStatementExecuteQueryOnBackend(
0675:                                        proc, backend, null, c, metadataCache);
0676:                    else
0677:                        return AbstractLoadBalancer
0678:                                .executeCallableStatementExecuteOnBackend(
0679:                                        proc,
0680:                                        backend,
0681:                                        null,
0682:                                        cm
0683:                                                .retrieveConnectionForTransaction(tid),
0684:                                        metadataCache);
0685:                } catch (Exception e) {
0686:                    throw new SQLException(Translate.get(
0687:                            "loadbalancer.storedprocedure.failed.on.backend",
0688:                            new String[] {
0689:                                    proc.getSqlShortForm(vdb
0690:                                            .getSqlShortFormLength()),
0691:                                    backend.getName(), e.getMessage() }));
0692:                } finally {
0693:                    if (logger.isDebugEnabled())
0694:                        logger.debug(Translate.get(
0695:                                "loadbalancer.execute.transaction.on",
0696:                                new String[] { String.valueOf(tid),
0697:                                        String.valueOf(proc.getId()),
0698:                                        backend.getName() }));
0699:                }
0700:            }
0701:
0702:            /**
0703:             * Executes a stored procedure <em>in autocommit mode</em> on the selected
0704:             * backend.
0705:             * 
0706:             * @param proc the <em>autocommit</em> stored procedure to execute
0707:             * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0708:             *          false if we must call CallableStatement.execute()
0709:             * @param backend the backend that will execute the request
0710:             * @param metadataCache the metadataCache if any or null
0711:             * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0712:             *         <code>ExecuteResult</code> object otherwise
0713:             * @throws SQLException if an error occurs
0714:             */
0715:            private Object executeStoredProcedureInAutoCommit(
0716:                    StoredProcedure proc, boolean isExecuteQuery,
0717:                    DatabaseBackend backend, MetadataCache metadataCache)
0718:                    throws SQLException, UnreachableBackendException {
0719:                // Ok, we have a backend, let's execute the request
0720:                AbstractConnectionManager cm = backend
0721:                        .getConnectionManager(proc.getLogin());
0722:
0723:                // Sanity check
0724:                if (cm == null) {
0725:                    String msg = Translate.get(
0726:                            "loadbalancer.connectionmanager.not.found", proc
0727:                                    .getLogin(), backend.getName());
0728:                    logger.error(msg);
0729:                    throw new SQLException(msg);
0730:                }
0731:
0732:                // Use a connection just for this request
0733:                PooledConnection c = null;
0734:                try {
0735:                    c = cm.retrieveConnectionInAutoCommit(proc);
0736:                } catch (UnreachableBackendException e1) {
0737:                    logger.error(Translate.get(
0738:                            "loadbalancer.backend.disabling.unreachable",
0739:                            backend.getName()));
0740:                    disableBackend(backend, true);
0741:                    throw new UnreachableBackendException(Translate.get(
0742:                            "loadbalancer.backend.unreacheable", backend
0743:                                    .getName()));
0744:                }
0745:
0746:                // Sanity check
0747:                if (c == null)
0748:                    throw new SQLException(Translate.get(
0749:                            "loadbalancer.backend.no.connection", backend
0750:                                    .getName()));
0751:
0752:                // Execute Query
0753:                try {
0754:                    if (isExecuteQuery)
0755:                        return AbstractLoadBalancer
0756:                                .executeCallableStatementExecuteQueryOnBackend(
0757:                                        proc, backend, null, c.getConnection(),
0758:                                        metadataCache);
0759:                    else
0760:                        return AbstractLoadBalancer
0761:                                .executeCallableStatementExecuteOnBackend(proc,
0762:                                        backend, null, c, metadataCache);
0763:                } catch (Exception e) {
0764:                    throw new SQLException(Translate.get(
0765:                            "loadbalancer.storedprocedure.failed.on.backend",
0766:                            new String[] {
0767:                                    proc.getSqlShortForm(vdb
0768:                                            .getSqlShortFormLength()),
0769:                                    backend.getName(), e.getMessage() }));
0770:                } finally {
0771:                    cm.releaseConnectionInAutoCommit(proc, c);
0772:                    if (logger.isDebugEnabled())
0773:                        logger.debug(Translate.get(
0774:                                "loadbalancer.storedprocedure.on", String
0775:                                        .valueOf(proc.getId()), backend
0776:                                        .getName()));
0777:                }
0778:            }
0779:
0780:            /**
0781:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0782:             *      MetadataCache)
0783:             */
0784:            public ControllerResultSet callableStatementExecuteQuery(
0785:                    StoredProcedure proc, MetadataCache metadataCache)
0786:                    throws SQLException, AllBackendsFailedException {
0787:                CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure(
0788:                        proc, EXECUTE_QUERY_TASK, metadataCache);
0789:                return task.getResult();
0790:            }
0791:
0792:            /**
0793:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0794:             */
0795:            public ExecuteUpdateResult callableStatementExecuteUpdate(
0796:                    StoredProcedure proc) throws SQLException,
0797:                    AllBackendsFailedException {
0798:                CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure(
0799:                        proc, EXECUTE_UPDATE_TASK, null);
0800:                return task.getResult();
0801:            }
0802:
0803:            /**
0804:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
0805:             *      MetadataCache)
0806:             */
0807:            public ExecuteResult callableStatementExecute(StoredProcedure proc,
0808:                    MetadataCache metadataCache) throws SQLException,
0809:                    AllBackendsFailedException {
0810:                CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure(
0811:                        proc, EXECUTE_TASK, metadataCache);
0812:                return task.getResult();
0813:            }
0814:
0815:            private static final int EXECUTE_QUERY_TASK = 0;
0816:            private static final int EXECUTE_UPDATE_TASK = 1;
0817:            private static final int EXECUTE_TASK = 2;
0818:
0819:            /**
0820:             * Post the stored procedure call in the threads task list.
0821:             * <p>
0822:             * Note that macros are also processed here.
0823:             * 
0824:             * @param proc the stored procedure to call
0825:             * @param taskType one of EXECUTE_QUERY_TASK, EXECUTE_UPDATE_TASK or
0826:             *          EXECUTE_TASK
0827:             * @param metadataCache the metadataCache if any or null
0828:             * @return the task that has been executed (caller can get the result by
0829:             *         calling getResult())
0830:             * @throws SQLException if an error occurs
0831:             * @throws AllBackendsFailedException if all backends failed to execute the
0832:             *           stored procedure
0833:             * @throws NoMoreBackendException if no backend was available to execute the
0834:             *           stored procedure
0835:             */
0836:            private AbstractTask callStoredProcedure(StoredProcedure proc,
0837:                    int taskType, MetadataCache metadataCache)
0838:                    throws SQLException, AllBackendsFailedException,
0839:                    NoMoreBackendException {
0840:                // Total ordering mainly for distributed virtual databases.
0841:                boolean removeFromTotalOrderQueue = waitForTotalOrder(proc,
0842:                        true);
0843:
0844:                // Handle macros
0845:                handleMacros(proc);
0846:
0847:                int nbOfThreads = acquireLockAndCheckNbOfThreads(proc, String
0848:                        .valueOf(proc.getId()));
0849:
0850:                // Create the task
0851:                AbstractTask task;
0852:                switch (taskType) {
0853:                case EXECUTE_QUERY_TASK:
0854:                    task = new CallableStatementExecuteQueryTask(
0855:                            getNbToWait(nbOfThreads), nbOfThreads, proc,
0856:                            metadataCache);
0857:                    break;
0858:                case EXECUTE_UPDATE_TASK:
0859:                    task = new CallableStatementExecuteUpdateTask(
0860:                            getNbToWait(nbOfThreads), nbOfThreads, proc);
0861:                    break;
0862:                case EXECUTE_TASK:
0863:                    task = new CallableStatementExecuteTask(
0864:                            getNbToWait(nbOfThreads), nbOfThreads, proc,
0865:                            metadataCache);
0866:                    break;
0867:                default:
0868:                    throw new RuntimeException("Unhandled task type "
0869:                            + taskType + " in callStoredProcedure");
0870:                }
0871:
0872:                List backendList = getBackendsWithStoredProcedure(proc
0873:                        .getProcedureKey(), proc.getNbOfParameters(),
0874:                        nbOfThreads);
0875:
0876:                if (backendList.size() == 0) {
0877:                    releaseLockAndUnlockNextQuery(proc);
0878:                    throw new SQLException(Translate.get(
0879:                            "loadbalancer.backend.no.required.storedprocedure",
0880:                            proc.getProcedureKey()));
0881:                }
0882:
0883:                task.setTotalNb(backendList.size());
0884:                atomicTaskPostInQueueAndReleaseLock(proc, task, backendList,
0885:                        removeFromTotalOrderQueue);
0886:
0887:                synchronized (task) {
0888:                    if (!task.hasCompleted())
0889:                        waitForTaskCompletion(proc.getTimeout() * 1000L, String
0890:                                .valueOf(proc.getId()), task);
0891:
0892:                    checkTaskCompletion(task);
0893:                    return task;
0894:                }
0895:            }
0896:
0897:            /**
0898:             * Check the completion status of the task and throws appropriate Exceptions
0899:             * if the status of the task was not successful, otherwise do nothing.
0900:             * 
0901:             * @param task the completed AbstractTask
0902:             * @throws AllBackendsFailedException if all backends failed to execute the
0903:             *           request
0904:             * @exception NoMoreBackendException if no backends are left to execute the
0905:             *              request
0906:             * @exception SQLException if an error occurs
0907:             */
0908:            private void checkTaskCompletion(AbstractTask task)
0909:                    throws NoMoreBackendException, AllBackendsFailedException,
0910:                    SQLException {
0911:                AbstractRequest request = task.getRequest();
0912:
0913:                if (task.getSuccess() > 0)
0914:                    return;
0915:
0916:                // Check that someone failed, it might be the case that we only have
0917:                // disabling backends left and they have not played this query (thus
0918:                // none of them have succeeded or failed).
0919:                if (task.getFailed() == 0) {
0920:                    throw new NoMoreBackendException(Translate
0921:                            .get("loadbalancer.backendlist.empty"));
0922:                }
0923:
0924:                if (task.getSuccess() == 0) {
0925:                    // All backends that executed the query failed
0926:                    List exceptions = task.getExceptions();
0927:                    if (exceptions == null)
0928:                        throw new AllBackendsFailedException(Translate.get(
0929:                                "loadbalancer.request.failed.all",
0930:                                new Object[] { request.getType(),
0931:                                        String.valueOf(request.getId()) }));
0932:                    else {
0933:                        String errorMsg = Translate.get(
0934:                                "loadbalancer.request.failed.stack",
0935:                                new Object[] { request.getType(),
0936:                                        String.valueOf(request.getId()) })
0937:                                + "\n";
0938:                        SQLException ex = SQLExceptionFactory.getSQLException(
0939:                                exceptions, errorMsg);
0940:                        logger.error(ex.getMessage());
0941:                        throw ex;
0942:                    }
0943:                }
0944:            }
0945:
0946:            /**
0947:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
0948:             */
0949:            public ControllerResultSet getPreparedStatementGetMetaData(
0950:                    AbstractRequest request) throws SQLException {
0951:                // Choose a backend
0952:                try {
0953:                    vdb.acquireReadLockBackendLists();
0954:                } catch (InterruptedException e) {
0955:                    String msg = Translate.get(
0956:                            "loadbalancer.backendlist.acquire.readlock.failed",
0957:                            e);
0958:                    logger.error(msg);
0959:                    throw new SQLException(msg);
0960:                }
0961:
0962:                /*
0963:                 * The backend that will execute the query
0964:                 */
0965:                DatabaseBackend backend = null;
0966:
0967:                // Note that vdb lock is released in the finally clause of this try/catch
0968:                // block
0969:                try {
0970:                    ArrayList backends = vdb.getBackends();
0971:                    int size = backends.size();
0972:
0973:                    if (size == 0)
0974:                        throw new SQLException(Translate.get(
0975:                                "loadbalancer.execute.no.backend.available",
0976:                                request.getId()));
0977:
0978:                    // Choose the first available backend
0979:                    for (int i = 0; i < size; i++) {
0980:                        DatabaseBackend b = (DatabaseBackend) backends.get(i);
0981:                        if (b.isReadEnabled()) {
0982:                            backend = b;
0983:                            break;
0984:                        }
0985:                    }
0986:                } catch (Throwable e) {
0987:                    String msg = Translate.get(
0988:                            "loadbalancer.execute.find.backend.failed",
0989:                            new String[] {
0990:                                    request.getSqlShortForm(vdb
0991:                                            .getSqlShortFormLength()),
0992:                                    e.getMessage() });
0993:                    logger.error(msg, e);
0994:                    throw new SQLException(msg);
0995:                } finally {
0996:                    vdb.releaseReadLockBackendLists();
0997:                }
0998:
0999:                if (backend == null)
1000:                    throw new NoMoreBackendException(Translate.get(
1001:                            "loadbalancer.execute.no.backend.enabled", request
1002:                                    .getId()));
1003:
1004:                // Ok, we have a backend, let's execute the request
1005:                AbstractConnectionManager cm = backend
1006:                        .getConnectionManager(request.getLogin());
1007:
1008:                // Sanity check
1009:                if (cm == null) {
1010:                    String msg = Translate.get(
1011:                            "loadbalancer.connectionmanager.not.found",
1012:                            new String[] { request.getLogin(),
1013:                                    backend.getName() });
1014:                    logger.error(msg);
1015:                    throw new SQLException(msg);
1016:                }
1017:
1018:                // Execute the query
1019:                if (request.isAutoCommit()) {
1020:                    ControllerResultSet rs = null;
1021:                    boolean badConnection;
1022:                    do {
1023:                        badConnection = false;
1024:                        // Use a connection just for this request
1025:                        PooledConnection c = null;
1026:                        try {
1027:                            c = cm.retrieveConnectionInAutoCommit(request);
1028:                        } catch (UnreachableBackendException e1) {
1029:                            logger
1030:                                    .error(Translate
1031:                                            .get(
1032:                                                    "loadbalancer.backend.disabling.unreachable",
1033:                                                    backend.getName()));
1034:                            disableBackend(backend, true);
1035:                            // Retry on a different backend
1036:                            return getPreparedStatementGetMetaData(request);
1037:                        }
1038:
1039:                        // Sanity check
1040:                        if (c == null)
1041:                            throw new SQLException(Translate.get(
1042:                                    "loadbalancer.backend.no.connection",
1043:                                    backend.getName()));
1044:
1045:                        // Execute Query
1046:                        try {
1047:                            rs = preparedStatementGetMetaDataOnBackend(request
1048:                                    .getSqlOrTemplate(), backend, c
1049:                                    .getConnection());
1050:                            cm.releaseConnectionInAutoCommit(request, c);
1051:                        } catch (SQLException e) {
1052:                            cm.releaseConnectionInAutoCommit(request, c);
1053:                            throw SQLExceptionFactory
1054:                                    .getSQLException(
1055:                                            e,
1056:                                            Translate
1057:                                                    .get(
1058:                                                            "loadbalancer.request.failed.on.backend",
1059:                                                            new String[] {
1060:                                                                    request
1061:                                                                            .getSqlShortForm(vdb
1062:                                                                                    .getSqlShortFormLength()),
1063:                                                                    backend
1064:                                                                            .getName(),
1065:                                                                    e
1066:                                                                            .getMessage() }));
1067:                        } catch (BadConnectionException e) { // Get rid of the bad connection
1068:                            cm.deleteConnection(c);
1069:                            badConnection = true;
1070:                        } catch (Throwable e) {
1071:                            cm.releaseConnectionInAutoCommit(request, c);
1072:                            throw new SQLException(
1073:                                    Translate
1074:                                            .get(
1075:                                                    "loadbalancer.request.failed.on.backend",
1076:                                                    new String[] {
1077:                                                            request
1078:                                                                    .getSqlShortForm(vdb
1079:                                                                            .getSqlShortFormLength()),
1080:                                                            backend.getName(),
1081:                                                            e.getMessage() }));
1082:                        }
1083:                    } while (badConnection);
1084:                    if (logger.isDebugEnabled())
1085:                        logger.debug(Translate.get("loadbalancer.execute.on",
1086:                                new String[] { String.valueOf(request.getId()),
1087:                                        backend.getName() }));
1088:                    return rs;
1089:                } else { // Inside a transaction
1090:                    Connection c;
1091:                    long tid = request.getTransactionId();
1092:
1093:                    try {
1094:                        c = backend
1095:                                .getConnectionForTransactionAndLazyBeginIfNeeded(
1096:                                        request, cm);
1097:                    } catch (UnreachableBackendException e1) {
1098:                        logger.error(Translate.get(
1099:                                "loadbalancer.backend.disabling.unreachable",
1100:                                backend.getName()));
1101:                        disableBackend(backend, true);
1102:                        throw new NoMoreBackendException(Translate.get(
1103:                                "loadbalancer.backend.unreacheable", backend
1104:                                        .getName()));
1105:                    } catch (NoTransactionStartWhenDisablingException e) {
1106:                        String msg = Translate.get(
1107:                                "loadbalancer.backend.is.disabling",
1108:                                new String[] {
1109:                                        request.getSqlShortForm(vdb
1110:                                                .getSqlShortFormLength()),
1111:                                        backend.getName() });
1112:                        logger.error(msg);
1113:                        throw new SQLException(msg);
1114:                    }
1115:
1116:                    // Sanity check
1117:                    if (c == null)
1118:                        throw new SQLException(Translate.get(
1119:                                "loadbalancer.unable.retrieve.connection",
1120:                                new String[] { String.valueOf(tid),
1121:                                        backend.getName() }));
1122:
1123:                    // Execute Query
1124:                    ControllerResultSet rs = null;
1125:                    try {
1126:                        rs = preparedStatementGetMetaDataOnBackend(request
1127:                                .getSqlOrTemplate(), backend, c);
1128:                    } catch (SQLException e) {
1129:                        throw e;
1130:                    } catch (BadConnectionException e) { // Connection failed, so did the transaction
1131:                        // Disable the backend.
1132:                        cm.deleteConnection(tid);
1133:                        String msg = Translate
1134:                                .get(
1135:                                        "loadbalancer.backend.disabling.connection.failure",
1136:                                        backend.getName());
1137:                        logger.error(msg);
1138:                        disableBackend(backend, true);
1139:                        throw new SQLException(msg);
1140:                    } catch (Throwable e) {
1141:                        throw new SQLException(Translate.get(
1142:                                "loadbalancer.request.failed.on.backend",
1143:                                new String[] {
1144:                                        request.getSqlShortForm(vdb
1145:                                                .getSqlShortFormLength()),
1146:                                        backend.getName(), e.getMessage() }));
1147:                    }
1148:                    if (logger.isDebugEnabled())
1149:                        logger.debug(Translate.get(
1150:                                "loadbalancer.execute.transaction.on",
1151:                                new String[] { String.valueOf(tid),
1152:                                        String.valueOf(request.getId()),
1153:                                        backend.getName() }));
1154:                    return rs;
1155:                }
1156:            }
1157:
1158:            /*
1159:             * Transaction management
1160:             */
1161:            /**
1162:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1163:             */
1164:            public void abort(TransactionMetaData tm) throws SQLException {
1165:                long tid = tm.getTransactionId();
1166:
1167:                DistributedRollback toqObject = null;
1168:                /*
1169:                 * Let previous queries be flushed into the load balancer queues so that we
1170:                 * can abort them and that no queries for that transaction wait in the total
1171:                 * order queue while we are aborting. Note that the wait and remove from the
1172:                 * total order queue will be done in the call to rollback at the end of this
1173:                 * method.
1174:                 */
1175:                if (vdb.getTotalOrderQueue() != null) {
1176:                    toqObject = new DistributedRollback(tm.getLogin(), tid);
1177:                    waitForTotalOrder(toqObject, false);
1178:                }
1179:
1180:                // Acquire the lock
1181:                String requestDescription = "abort " + tid;
1182:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1183:                        requestDescription);
1184:
1185:                boolean rollbackInProgress = false;
1186:                synchronized (enabledBackends) {
1187:                    // Abort all queries on all backends that have started this transaction
1188:                    for (int i = 0; i < nbOfThreads; i++) {
1189:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1190:                                .get(i);
1191:                        rollbackInProgress = rollbackInProgress
1192:                                || backend.getTaskQueues()
1193:                                        .abortAllQueriesForTransaction(tid);
1194:                    }
1195:                }
1196:
1197:                // Release the lock
1198:                backendListLock.releaseRead();
1199:
1200:                if (rollbackInProgress) { // already aborting
1201:                    if (vdb.getTotalOrderQueue() != null)
1202:                        removeObjectFromAndNotifyTotalOrderQueue(toqObject);
1203:                    return;
1204:                }
1205:
1206:                rollback(tm);
1207:            }
1208:
1209:            /**
1210:             * Begins a new transaction.
1211:             * 
1212:             * @param tm the transaction marker metadata
1213:             * @exception SQLException if an error occurs
1214:             */
1215:            public final void begin(TransactionMetaData tm) throws SQLException {
1216:            }
1217:
1218:            /**
1219:             * Commits a transaction.
1220:             * 
1221:             * @param tm the transaction marker metadata
1222:             * @exception SQLException if an error occurs
1223:             */
1224:            public void commit(TransactionMetaData tm) throws SQLException {
1225:                long tid = tm.getTransactionId();
1226:
1227:                // Ordering for distributed virtual database
1228:                boolean canTakeReadLock = false;
1229:                DistributedCommit totalOrderCommit = null;
1230:                if (vdb.getTotalOrderQueue() != null) {
1231:                    // Total ordering mainly for distributed virtual databases.
1232:                    // If waitForTotalOrder returns true then the query has been scheduled in
1233:                    // total order and there is no need to take a write lock later to resolve
1234:                    // potential conflicts.
1235:                    totalOrderCommit = new DistributedCommit(tm.getLogin(), tid);
1236:                    canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1237:                    if (!canTakeReadLock)
1238:                        // This is a local commit no total order info
1239:                        totalOrderCommit = null;
1240:                }
1241:
1242:                // Acquire the lock
1243:                int nbOfThreads = acquireLockAndCheckNbOfThreads(
1244:                        totalOrderCommit, "commit " + tid);
1245:
1246:                List commitList = getBackendsWithStartedTransaction(new Long(
1247:                        tid), nbOfThreads);
1248:
1249:                int nbOfThreadsToCommit = commitList.size();
1250:                CommitTask task = null;
1251:                if (nbOfThreadsToCommit != 0) {
1252:                    task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1253:                            nbOfThreadsToCommit, tm);
1254:                }
1255:                // FIXME waht if nbOfThreadsToCommit == 0?
1256:
1257:                // Post the task in the non-conflicting queues.
1258:                synchronized (enabledBackends) {
1259:                    for (int i = 0; i < nbOfThreadsToCommit; i++) {
1260:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1261:                                .get(i);
1262:                        backend.getTaskQueues()
1263:                                .addTaskToBackendTotalOrderQueue(task);
1264:                    }
1265:                }
1266:
1267:                // Release the lock
1268:                backendListLock.releaseRead();
1269:
1270:                // Unblock next query from total order queue
1271:                if (totalOrderCommit != null)
1272:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1273:
1274:                waitForCommit(task, tm.getTimeout());
1275:            }
1276:
1277:            /**
1278:             * Rollbacks a transaction.
1279:             * 
1280:             * @param tm the transaction marker metadata
1281:             * @exception SQLException if an error occurs
1282:             */
1283:            public void rollback(TransactionMetaData tm) throws SQLException {
1284:                long tid = tm.getTransactionId();
1285:
1286:                // Ordering for distributed virtual database
1287:                DistributedRollback totalOrderRollback = null;
1288:                boolean canTakeReadLock = false;
1289:                if (vdb.getTotalOrderQueue() != null) {
1290:                    totalOrderRollback = new DistributedRollback(tm.getLogin(),
1291:                            tid);
1292:                    // Total ordering mainly for distributed virtual databases.
1293:                    // If waitForTotalOrder returns true then the query has been scheduled in
1294:                    // total order and there is no need to take a write lock later to resolve
1295:                    // potential conflicts.
1296:                    canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1297:                            false);
1298:                    if (!canTakeReadLock)
1299:                        // This is a local rollback no total order info
1300:                        totalOrderRollback = null;
1301:                }
1302:
1303:                // Acquire the lock
1304:                int nbOfThreads = acquireLockAndCheckNbOfThreads(
1305:                        totalOrderRollback, "rollback " + tid);
1306:
1307:                // Build the list of backends that need to rollback this transaction
1308:                List rollbackList = getBackendsWithStartedTransaction(new Long(
1309:                        tid), nbOfThreads);
1310:
1311:                int nbOfThreadsToRollback = rollbackList.size();
1312:                RollbackTask task = null;
1313:                if (nbOfThreadsToRollback != 0)
1314:                    task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1315:                            nbOfThreadsToRollback, tm);
1316:
1317:                // Post the task in the non-conflicting queues.
1318:                synchronized (enabledBackends) {
1319:                    for (int i = 0; i < nbOfThreadsToRollback; i++) {
1320:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1321:                                .get(i);
1322:                        backend.getTaskQueues()
1323:                                .addTaskToBackendTotalOrderQueue(task);
1324:                    }
1325:                }
1326:
1327:                // Release the lock
1328:                backendListLock.releaseRead();
1329:
1330:                // Unblock next query from total order queue
1331:                if (totalOrderRollback != null)
1332:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1333:
1334:                waitForRollback(task, tm.getTimeout());
1335:            }
1336:
1337:            /**
1338:             * Rollback a transaction to a savepoint
1339:             * 
1340:             * @param tm The transaction marker metadata
1341:             * @param savepointName The name of the savepoint
1342:             * @throws SQLException if an error occurs
1343:             */
1344:            public void rollbackToSavepoint(TransactionMetaData tm,
1345:                    String savepointName) throws SQLException {
1346:                long tid = tm.getTransactionId();
1347:
1348:                // Ordering for distributed virtual database
1349:                DistributedRollbackToSavepoint totalOrderRollback = null;
1350:                boolean canTakeReadLock = false;
1351:                if (vdb.getTotalOrderQueue() != null) {
1352:                    totalOrderRollback = new DistributedRollbackToSavepoint(
1353:                            tid, savepointName);
1354:                    // Total ordering mainly for distributed virtual databases.
1355:                    // If waitForTotalOrder returns true then the query has been scheduled in
1356:                    // total order and there is no need to take a write lock later to resolve
1357:                    // potential conflicts.
1358:                    canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1359:                            false);
1360:                    if (!canTakeReadLock)
1361:                        // This is a local commit no total order info
1362:                        totalOrderRollback = null;
1363:                }
1364:
1365:                // Acquire the lock
1366:                int nbOfThreads = acquireLockAndCheckNbOfThreads(
1367:                        totalOrderRollback, "rollback " + savepointName + " "
1368:                                + tid);
1369:
1370:                // Build the list of backends that need to rollback this transaction
1371:                List rollbackList = getBackendsWithStartedTransaction(new Long(
1372:                        tid), nbOfThreads);
1373:
1374:                int nbOfThreadsToRollback = rollbackList.size();
1375:                RollbackToSavepointTask task = null;
1376:                if (nbOfThreadsToRollback != 0)
1377:                    task = new RollbackToSavepointTask(
1378:                            getNbToWait(nbOfThreadsToRollback),
1379:                            nbOfThreadsToRollback, tm, savepointName);
1380:
1381:                // Post the task in the non-conflicting queues.
1382:                synchronized (enabledBackends) {
1383:                    for (int i = 0; i < nbOfThreadsToRollback; i++) {
1384:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1385:                                .get(i);
1386:                        backend.getTaskQueues()
1387:                                .addTaskToBackendTotalOrderQueue(task);
1388:                    }
1389:                }
1390:
1391:                // Release the lock
1392:                backendListLock.releaseRead();
1393:
1394:                // Unblock next query from total order queue
1395:                if (totalOrderRollback != null)
1396:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1397:
1398:                waitForRollbackToSavepoint(task, savepointName, tm.getTimeout());
1399:            }
1400:
1401:            /**
1402:             * Release a savepoint from a transaction
1403:             * 
1404:             * @param tm The transaction marker metadata
1405:             * @param savepointName The name of the savepoint ro release
1406:             * @throws SQLException if an error occurs
1407:             */
1408:            public void releaseSavepoint(TransactionMetaData tm,
1409:                    String savepointName) throws SQLException {
1410:                long tid = tm.getTransactionId();
1411:
1412:                // Ordering for distributed virtual database
1413:                DistributedReleaseSavepoint totalOrderRelease = null;
1414:                boolean canTakeReadLock = false;
1415:                if (vdb.getTotalOrderQueue() != null) {
1416:                    totalOrderRelease = new DistributedReleaseSavepoint(tid,
1417:                            savepointName);
1418:                    // Total ordering mainly for distributed virtual databases.
1419:                    // If waitForTotalOrder returns true then the query has been scheduled in
1420:                    // total order and there is no need to take a write lock later to resolve
1421:                    // potential conflicts.
1422:                    canTakeReadLock = waitForTotalOrder(totalOrderRelease,
1423:                            false);
1424:                    if (!canTakeReadLock)
1425:                        // This is a local commit no total order info
1426:                        totalOrderRelease = null;
1427:                }
1428:
1429:                // Acquire the lock
1430:                int nbOfThreads = acquireLockAndCheckNbOfThreads(
1431:                        totalOrderRelease, "release savepoint " + savepointName
1432:                                + " " + tid);
1433:
1434:                // Build the list of backends that need to rollback this transaction
1435:                List savepointList = getBackendsWithStartedTransaction(
1436:                        new Long(tid), nbOfThreads);
1437:
1438:                int nbOfSavepoints = savepointList.size();
1439:                ReleaseSavepointTask task = null;
1440:                if (nbOfSavepoints != 0)
1441:                    task = new ReleaseSavepointTask(getNbToWait(nbOfThreads),
1442:                            nbOfThreads, tm, savepointName);
1443:
1444:                // Post the task in the non-conflicting queues.
1445:                synchronized (enabledBackends) {
1446:                    for (int i = 0; i < nbOfSavepoints; i++) {
1447:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1448:                                .get(i);
1449:                        backend.getTaskQueues()
1450:                                .addTaskToBackendTotalOrderQueue(task);
1451:                    }
1452:                }
1453:
1454:                // Release the lock
1455:                backendListLock.releaseRead();
1456:
1457:                // Unblock next query from total order queue
1458:                if (totalOrderRelease != null)
1459:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease);
1460:
1461:                waitForReleaseSavepoint(task, savepointName, tm.getTimeout());
1462:            }
1463:
1464:            /**
1465:             * Set a savepoint to a transaction.
1466:             * 
1467:             * @param tm The transaction marker metadata
1468:             * @param savepointName The name of the new savepoint
1469:             * @throws SQLException if an error occurs
1470:             */
1471:            public void setSavepoint(TransactionMetaData tm,
1472:                    String savepointName) throws SQLException {
1473:                long tid = tm.getTransactionId();
1474:
1475:                // Ordering for distributed virtual database
1476:                DistributedSetSavepoint totalOrderSavepoint = null;
1477:                boolean canTakeReadLock = false;
1478:                if (vdb.getTotalOrderQueue() != null) {
1479:                    totalOrderSavepoint = new DistributedSetSavepoint(tm
1480:                            .getLogin(), tid, savepointName);
1481:                    // Total ordering mainly for distributed virtual databases.
1482:                    // If waitForTotalOrder returns true then the query has been scheduled in
1483:                    // total order and there is no need to take a write lock later to resolve
1484:                    // potential conflicts.
1485:                    canTakeReadLock = waitForTotalOrder(totalOrderSavepoint,
1486:                            false);
1487:                    if (!canTakeReadLock)
1488:                        // This is a local commit no total order info
1489:                        totalOrderSavepoint = null;
1490:                }
1491:
1492:                // Update the recovery log
1493:                if (recoveryLog != null)
1494:                    recoveryLog.logSetSavepoint(tm, savepointName);
1495:
1496:                // Acquire the lock
1497:                String requestDescription = "set savepoint " + savepointName
1498:                        + " " + tid;
1499:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1500:                        requestDescription);
1501:
1502:                SavepointTask task = null;
1503:
1504:                // Post the task in the non-conflicting queues of all backends.
1505:                synchronized (enabledBackends) {
1506:                    if (nbOfThreads != 0) {
1507:                        task = new SavepointTask(getNbToWait(nbOfThreads),
1508:                                nbOfThreads, tm, savepointName);
1509:                        for (int i = 0; i < nbOfThreads; i++) {
1510:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1511:                                    .get(i);
1512:                            backend.getTaskQueues()
1513:                                    .addTaskToBackendTotalOrderQueue(task);
1514:                        }
1515:                    }
1516:                }
1517:
1518:                // Release the lock
1519:                backendListLock.releaseRead();
1520:
1521:                // Unblock next query from total order queue
1522:                if (totalOrderSavepoint != null)
1523:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint);
1524:
1525:                waitForSavepoint(task, savepointName, tm.getTimeout());
1526:            }
1527:
1528:            /**
1529:             * Wait for a <code>CommitTask</code> to complete
1530:             * 
1531:             * @param task the <code>CommitTask</code>
1532:             * @param timeout the timeout (in milliseconds) for task completion
1533:             * @throws SQLException if an error occurs while completing the commit task
1534:             */
1535:            // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1536:            // FIXME the timeout is a field of the CommitTask but the getter is not
1537:            // defined
1538:            private void waitForCommit(CommitTask task, long timeout)
1539:                    throws SQLException {
1540:                // Check if someone had something to commit
1541:                if (task == null)
1542:                    return;
1543:
1544:                long tid = task.getTransactionId();
1545:
1546:                synchronized (task) {
1547:                    if (!task.hasCompleted())
1548:                        waitForTaskCompletion(timeout, "commit " + tid, task);
1549:
1550:                    if (task.getSuccess() == 0) { // All tasks failed
1551:                        List exceptions = task.getExceptions();
1552:                        if (exceptions == null)
1553:                            throw new SQLException(Translate.get(
1554:                                    "loadbalancer.commit.all.failed", tid));
1555:                        else {
1556:                            SQLException ex = SQLExceptionFactory
1557:                                    .getSQLException(exceptions, Translate.get(
1558:                                            "loadbalancer.commit.failed.stack",
1559:                                            tid)
1560:                                            + "\n");
1561:                            logger.error(ex.getMessage());
1562:                            throw ex;
1563:                        }
1564:                    }
1565:                }
1566:            }
1567:
1568:            /**
1569:             * Wait for a <code>RollbackTask</code> to complete
1570:             * 
1571:             * @param task the <code>RollbackTask</code>
1572:             * @param timeout the timeout (in milliseconds) for task completion
1573:             * @throws SQLException if an error occurs while completing the rollback task
1574:             */
1575:            // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1576:            // FIXME the timeout is a field of the RollbacTask but the getter is not
1577:            // defined
1578:            private void waitForRollback(RollbackTask task, long timeout)
1579:                    throws SQLException {
1580:                // Check if someone had something to rollback
1581:                if (task == null)
1582:                    return;
1583:
1584:                long tid = task.getTransactionId();
1585:
1586:                synchronized (task) {
1587:                    if (!task.hasCompleted())
1588:                        waitForTaskCompletion(timeout, "rollback " + tid, task);
1589:
1590:                    if (task.getSuccess() == 0) { // All tasks failed
1591:                        List exceptions = task.getExceptions();
1592:                        if (exceptions == null)
1593:                            throw new SQLException(Translate.get(
1594:                                    "loadbalancer.rollback.all.failed", tid));
1595:                        else {
1596:                            SQLException ex = SQLExceptionFactory
1597:                                    .getSQLException(
1598:                                            exceptions,
1599:                                            Translate
1600:                                                    .get(
1601:                                                            "loadbalancer.rollback.failed.stack",
1602:                                                            tid)
1603:                                                    + "\n");
1604:                            logger.error(ex.getMessage());
1605:                            throw ex;
1606:                        }
1607:                    }
1608:                }
1609:            }
1610:
1611:            /**
1612:             * Wait for a <code>RollbackToSavepointTask</code> to complete
1613:             * 
1614:             * @param task the <code>RollbackToSavepointTask</code>
1615:             * @param savepoint the savepoint name
1616:             * @param timeout the timeout (in milliseconds) for task completion
1617:             * @throws SQLException if an error occurs while completing the rollback to
1618:             *           savepoint task
1619:             */
1620:            // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1621:            // FIXME the timeout is a field of the RollbackToSavepointTask but the getter
1622:            // is not defined
1623:            // FIXME the savepoint is a field of the RollbackToSavepointTask but the
1624:            // getter is not defined
1625:            private void waitForRollbackToSavepoint(
1626:                    RollbackToSavepointTask task, String savepoint, long timeout)
1627:                    throws SQLException {
1628:                // Check if someone had something to rollback
1629:                if (task == null)
1630:                    return;
1631:
1632:                long tid = task.getTransactionId();
1633:
1634:                synchronized (task) {
1635:                    if (!task.hasCompleted())
1636:                        waitForTaskCompletion(timeout, "rollback " + savepoint
1637:                                + " " + tid, task);
1638:
1639:                    if (task.getSuccess() == 0) { // All tasks failed
1640:                        List exceptions = task.getExceptions();
1641:                        if (exceptions == null)
1642:                            throw new SQLException(
1643:                                    Translate
1644:                                            .get(
1645:                                                    "loadbalancer.rollbacksavepoint.all.failed",
1646:                                                    savepoint, String
1647:                                                            .valueOf(tid)));
1648:                        else {
1649:                            SQLException ex = SQLExceptionFactory
1650:                                    .getSQLException(
1651:                                            exceptions,
1652:                                            Translate
1653:                                                    .get(
1654:                                                            "loadbalancer.rollbacksavepoint.failed.stack",
1655:                                                            savepoint,
1656:                                                            String.valueOf(tid))
1657:                                                    + "\n");
1658:                            logger.error(ex.getMessage());
1659:                            throw ex;
1660:                        }
1661:                    }
1662:                }
1663:            }
1664:
1665:            /**
1666:             * Wait for a <code>ReleaseSavepointTask</code> to complete
1667:             * 
1668:             * @param task the <code>ReleaseSavepointTask</code>
1669:             * @param savepoint the savepoint name
1670:             * @param timeout the timeout (in milliseconds) for task completion
1671:             * @throws SQLException if an error occurs while completing the release
1672:             *           savepoint task
1673:             */
1674:            // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1675:            // FIXME the timeout is a field of the ReleaseSavepointTask but the getter is
1676:            // not defined
1677:            // FIXME the savepoint is a field of the ReleaseSavepointTask but the getter
1678:            // is not defined
1679:            private void waitForReleaseSavepoint(ReleaseSavepointTask task,
1680:                    String savepoint, long timeout) throws SQLException {
1681:                // Check if someone had something to release
1682:                if (task == null)
1683:                    return;
1684:
1685:                long tid = task.getTransactionId();
1686:
1687:                synchronized (task) {
1688:                    if (!task.hasCompleted())
1689:                        waitForTaskCompletion(timeout, "release savepoint "
1690:                                + savepoint + " " + tid, task);
1691:
1692:                    if (task.getSuccess() == 0) { // All tasks failed
1693:                        List exceptions = task.getExceptions();
1694:                        if (exceptions == null)
1695:                            throw new SQLException(Translate.get(
1696:                                    "loadbalancer.releasesavepoint.all.failed",
1697:                                    savepoint, String.valueOf(tid)));
1698:                        else {
1699:                            SQLException ex = SQLExceptionFactory
1700:                                    .getSQLException(
1701:                                            exceptions,
1702:                                            Translate
1703:                                                    .get(
1704:                                                            "loadbalancer.releasesavepoint.failed.stack",
1705:                                                            savepoint,
1706:                                                            String.valueOf(tid))
1707:                                                    + "\n");
1708:                            logger.error(ex.getMessage());
1709:                            throw ex;
1710:                        }
1711:                    }
1712:                }
1713:            }
1714:
1715:            /**
1716:             * Wait for a <code>SavepointTask</code> to complete
1717:             * 
1718:             * @param task the <code>SavepointTask</code>
1719:             * @param savepoint the savepoint name
1720:             * @param timeout the timeout (in milliseconds) for task completion
1721:             * @throws SQLException if an error occurs while completing the savepoint task
1722:             */
1723:            // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1724:            // FIXME the timeout is a field of the SavepointTask but the getter is not
1725:            // defined
1726:            // FIXME the savepoint is a field of the SavepointTask but the getter is not
1727:            // defined
1728:            private void waitForSavepoint(SavepointTask task, String savepoint,
1729:                    long timeout) throws SQLException {
1730:                // Check if someone had something to release
1731:                if (task == null)
1732:                    return;
1733:
1734:                long tid = task.getTransactionId();
1735:
1736:                synchronized (task) {
1737:                    if (!task.hasCompleted())
1738:                        waitForTaskCompletion(timeout, "set savepoint "
1739:                                + savepoint + " " + tid, task);
1740:
1741:                    if (task.getSuccess() == 0) { // All tasks failed
1742:                        List exceptions = task.getExceptions();
1743:                        if (exceptions == null)
1744:                            throw new SQLException(Translate.get(
1745:                                    "loadbalancer.setsavepoint.all.failed",
1746:                                    savepoint, String.valueOf(tid)));
1747:                        else {
1748:                            SQLException ex = SQLExceptionFactory
1749:                                    .getSQLException(
1750:                                            exceptions,
1751:                                            Translate
1752:                                                    .get(
1753:                                                            "loadbalancer.setsavepoint.failed.stack",
1754:                                                            savepoint,
1755:                                                            String.valueOf(tid))
1756:                                                    + "\n");
1757:                            logger.error(ex.getMessage());
1758:                            throw ex;
1759:                        }
1760:                    }
1761:                }
1762:            }
1763:
1764:            //
1765:            // Utility functions
1766:            //
1767:            /**
1768:             * Check in which queue the task should be posted and atomically posts the
1769:             * task in the queue of all backends.
1770:             * 
1771:             * @param task the task to post
1772:             * @param writeList backend list to post the task to
1773:             * @param removeFromTotalOrderQueue true if the query must be removed from the
1774:             *          total order queue
1775:             */
1776:            private void atomicTaskPostInQueueAndReleaseLock(
1777:                    AbstractRequest request, AbstractTask task, List writeList,
1778:                    boolean removeFromTotalOrderQueue) {
1779:                synchronized (enabledBackends) {
1780:                    int nbOfThreads = writeList.size();
1781:                    for (int i = 0; i < nbOfThreads; i++) {
1782:                        BackendTaskQueues queues = ((DatabaseBackend) writeList
1783:                                .get(i)).getTaskQueues();
1784:                        queues.addTaskToBackendTotalOrderQueue(task);
1785:                    }
1786:                }
1787:
1788:                backendListLock.releaseRead();
1789:
1790:                // Unblock next query from total order queue
1791:                if (removeFromTotalOrderQueue) {
1792:                    removeObjectFromAndNotifyTotalOrderQueue(request);
1793:                }
1794:            }
1795:
1796:            /**
1797:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1798:             *      long)
1799:             */
1800:            public void closePersistentConnection(String login,
1801:                    long persistentConnectionId) throws SQLException {
1802:                /*
1803:                 * We assume a synchronous execution and connection closing can only come
1804:                 * after all requests have been executed in that connection. We post to all
1805:                 * backends and let the task deal with whether that backend had a persistent
1806:                 * connection or not.
1807:                 */
1808:
1809:                String requestDescription = "closing persistent connection "
1810:                        + persistentConnectionId;
1811:                int nbOfThreads = 0;
1812:
1813:                DistributedClosePersistentConnection totalOrderQueueObject = null;
1814:
1815:                boolean removefromTotalOrder = false;
1816:                if (vdb.getTotalOrderQueue() != null) {
1817:                    totalOrderQueueObject = new DistributedClosePersistentConnection(
1818:                            login, persistentConnectionId);
1819:                    removefromTotalOrder = waitForTotalOrder(
1820:                            totalOrderQueueObject, true);
1821:                }
1822:
1823:                ClosePersistentConnectionTask task = null;
1824:                try {
1825:                    nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1826:                            requestDescription);
1827:
1828:                    task = new ClosePersistentConnectionTask(
1829:                            getNbToWait(nbOfThreads), nbOfThreads, login,
1830:                            persistentConnectionId);
1831:
1832:                    // Post the task in the non-conflicting queues.
1833:                    synchronized (enabledBackends) {
1834:                        for (int i = 0; i < nbOfThreads; i++) {
1835:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1836:                                    .get(i);
1837:                            backend.getTaskQueues()
1838:                                    .addTaskToBackendTotalOrderQueue(task);
1839:                        }
1840:                    }
1841:
1842:                    // Release the lock
1843:                    backendListLock.releaseRead();
1844:
1845:                    if (removefromTotalOrder)
1846:                        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1847:                    totalOrderQueueObject = null;
1848:
1849:                    synchronized (task) {
1850:                        if (!task.hasCompleted())
1851:                            try {
1852:                                waitForTaskCompletion(0, requestDescription,
1853:                                        task);
1854:                            } catch (SQLException ignore) {
1855:                            }
1856:                    }
1857:                } finally {
1858:                    if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1859:                        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1860:                    }
1861:
1862:                    if (logger.isDebugEnabled())
1863:                        logger.debug(requestDescription + " completed on "
1864:                                + nbOfThreads + " backends.");
1865:                }
1866:            }
1867:
1868:            /**
1869:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1870:             *      long)
1871:             */
1872:            public void openPersistentConnection(String login,
1873:                    long persistentConnectionId) throws SQLException {
1874:                String requestDescription = "opening persistent connection "
1875:                        + persistentConnectionId;
1876:                int nbOfThreads = 0;
1877:
1878:                DistributedOpenPersistentConnection totalOrderQueueObject = null;
1879:                if (vdb.getTotalOrderQueue() != null) {
1880:                    totalOrderQueueObject = new DistributedOpenPersistentConnection(
1881:                            login, persistentConnectionId);
1882:                    waitForTotalOrder(totalOrderQueueObject, true);
1883:                }
1884:
1885:                OpenPersistentConnectionTask task = null;
1886:                try {
1887:                    nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1888:                            requestDescription);
1889:
1890:                    task = new OpenPersistentConnectionTask(
1891:                            getNbToWait(nbOfThreads), nbOfThreads, login,
1892:                            persistentConnectionId);
1893:
1894:                    // Post the task in the non-conflicting queues.
1895:                    synchronized (enabledBackends) {
1896:                        for (int i = 0; i < nbOfThreads; i++) {
1897:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1898:                                    .get(i);
1899:                            backend.getTaskQueues()
1900:                                    .addTaskToBackendTotalOrderQueue(task);
1901:                        }
1902:                    }
1903:
1904:                    // Release the lock
1905:                    backendListLock.releaseRead();
1906:
1907:                    removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1908:                    totalOrderQueueObject = null;
1909:
1910:                    synchronized (task) {
1911:                        if (!task.hasCompleted())
1912:                            try {
1913:                                waitForTaskCompletion(0, requestDescription,
1914:                                        task);
1915:                            } catch (SQLException ignore) {
1916:                            }
1917:                    }
1918:                } finally {
1919:                    if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1920:                        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1921:                    }
1922:
1923:                    if (logger.isDebugEnabled())
1924:                        logger.debug(requestDescription + " completed on "
1925:                                + nbOfThreads + " backends.");
1926:                }
1927:            }
1928:
1929:            /**
1930:             * Enables a Backend that was previously disabled.
1931:             * <p>
1932:             * Ask the corresponding connection manager to initialize the connections if
1933:             * needed.
1934:             * <p>
1935:             * No sanity checks are performed by this function.
1936:             * 
1937:             * @param db the database backend to enable
1938:             * @param writeEnabled True if the backend must be enabled for writes
1939:             * @throws SQLException if an error occurs
1940:             */
1941:            public synchronized void enableBackend(DatabaseBackend db,
1942:                    boolean writeEnabled) throws SQLException {
1943:                if (!db.isInitialized())
1944:                    db.initializeConnections();
1945:
1946:                if (writeEnabled && db.isWriteCanBeEnabled()) {
1947:                    // Create the new backend task queues
1948:                    db.setTaskQueues(new BackendTaskQueues(db,
1949:                            waitForCompletionPolicy, this .vdb
1950:                                    .getRequestManager()));
1951:                    db.startWorkerThreads(this );
1952:                    db.enableWrite();
1953:                }
1954:
1955:                db.enableRead();
1956:                try {
1957:                    backendListLock.acquireWrite();
1958:                } catch (InterruptedException e) {
1959:                    logger
1960:                            .error(
1961:                                    "Error while acquiring write lock in enableBackend",
1962:                                    e);
1963:                }
1964:                synchronized (enabledBackends) {
1965:                    enabledBackends.add(db);
1966:                }
1967:                backendListLock.releaseWrite();
1968:            }
1969:
1970:            /**
1971:             * Disables a backend that was previously enabled.
1972:             * <p>
1973:             * Ask the corresponding connection manager to finalize the connections if
1974:             * needed.
1975:             * <p>
1976:             * No sanity checks are performed by this function.
1977:             * 
1978:             * @param db the database backend to disable
1979:             * @param forceDisable true if disabling must be forced on the backend
1980:             * @throws SQLException if an error occurs
1981:             */
1982:            public void disableBackend(DatabaseBackend db, boolean forceDisable)
1983:                    throws SQLException {
1984:                if (!db.disable()) {
1985:                    // Another thread has already started the disable process
1986:                    return;
1987:                }
1988:                synchronized (this ) {
1989:                    try {
1990:                        backendListLock.acquireWrite();
1991:                    } catch (InterruptedException e) {
1992:                        logger
1993:                                .error(
1994:                                        "Error while acquiring write lock in enableBackend",
1995:                                        e);
1996:                    }
1997:
1998:                    try {
1999:                        synchronized (enabledBackends) {
2000:                            enabledBackends.remove(db);
2001:                            if (enabledBackends.isEmpty()) {
2002:                                // Cleanup schema for any remaining locks
2003:                                this .vdb.getRequestManager().setDatabaseSchema(
2004:                                        null, false);
2005:                            }
2006:                        }
2007:
2008:                        if (!forceDisable)
2009:                            terminateThreadsAndConnections(db);
2010:                    } finally {
2011:                        backendListLock.releaseWrite();
2012:                    }
2013:
2014:                    if (forceDisable) {
2015:                        db.shutdownConnectionManagers();
2016:                        terminateThreadsAndConnections(db);
2017:                    }
2018:
2019:                    // sanity check on backend's active transaction
2020:                    if (!db.getActiveTransactions().isEmpty()) {
2021:                        if (logger.isWarnEnabled()) {
2022:                            logger.warn("Active transactions after backend "
2023:                                    + db.getName() + " is disabled: "
2024:                                    + db.getActiveTransactions());
2025:                        }
2026:                    }
2027:
2028:                }
2029:            }
2030:
2031:            private void terminateThreadsAndConnections(DatabaseBackend db)
2032:                    throws SQLException {
2033:                db.terminateWorkerThreads();
2034:
2035:                if (db.isInitialized())
2036:                    db.finalizeConnections();
2037:            }
2038:
2039:            /**
2040:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2041:             */
2042:            public String getXmlImpl() {
2043:                StringBuffer info = new StringBuffer();
2044:                info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2045:                if (createTablePolicy != null)
2046:                    info.append(createTablePolicy.getXml());
2047:                if (waitForCompletionPolicy != null)
2048:                    info.append(waitForCompletionPolicy.getXml());
2049:                if (macroHandler != null)
2050:                    info.append(macroHandler.getXml());
2051:                this .getRaidb2Xml();
2052:                info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2053:                return info.toString();
2054:            }
2055:
2056:            /**
2057:             * return xml formatted information about this raidb2 load balancer
2058:             * 
2059:             * @return xml formatted string
2060:             */
2061:            public abstract String getRaidb2Xml();
2062:
2063:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.