Source Code Cross Referenced for RAIDb0.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » loadbalancer » raidb0 » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.loadbalancer.raidb0 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Contact: sequoia@continuent.org
0007:         * 
0008:         * Licensed under the Apache License, Version 2.0 (the "License");
0009:         * you may not use this file except in compliance with the License.
0010:         * You may obtain a copy of the License at
0011:         * 
0012:         * http://www.apache.org/licenses/LICENSE-2.0
0013:         * 
0014:         * Unless required by applicable law or agreed to in writing, software
0015:         * distributed under the License is distributed on an "AS IS" BASIS,
0016:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017:         * See the License for the specific language governing permissions and
0018:         * limitations under the License. 
0019:         *
0020:         * Initial developer(s): Emmanuel Cecchet. 
0021:         * Contributor(s): Jaco Swart, Jean-Bernard van Zuylen
0022:         */package org.continuent.sequoia.controller.loadbalancer.raidb0;
0023:
0024:        import java.sql.Connection;
0025:        import java.sql.SQLException;
0026:        import java.util.ArrayList;
0027:        import java.util.Collection;
0028:        import java.util.Iterator;
0029:        import java.util.List;
0030:
0031:        import org.continuent.sequoia.common.exceptions.BadConnectionException;
0032:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0033:        import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0034:        import org.continuent.sequoia.common.exceptions.NotImplementedException;
0035:        import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0036:        import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0037:        import org.continuent.sequoia.common.i18n.Translate;
0038:        import org.continuent.sequoia.common.log.Trace;
0039:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0040:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0041:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0042:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0043:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0044:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0045:        import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0046:        import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0047:        import org.continuent.sequoia.controller.connection.PooledConnection;
0048:        import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0049:        import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0050:        import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0051:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0052:        import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException;
0053:        import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
0054:        import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule;
0055:        import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0056:        import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0057:        import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0058:        import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0059:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0060:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0061:        import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0062:        import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0063:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0064:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0065:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0066:        import org.continuent.sequoia.controller.requests.ParsingGranularities;
0067:        import org.continuent.sequoia.controller.requests.SelectRequest;
0068:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0069:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0070:
0071:        /**
0072:         * RAIDb-0: database partitioning.
0073:         * <p>
0074:         * The requests are sent to the backend nodes hosting the tables needed to
0075:         * execute the request. If no backend has the needed tables to perform a
0076:         * request, it will fail.
0077:         * 
0078:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0079:         * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
0080:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0081:         *         </a>
0082:         * @version 1.0
0083:         */
0084:        public class RAIDb0 extends AbstractLoadBalancer {
0085:            //
0086:            // How the code is organized?
0087:            // 1. Member variables
0088:            // 2. Constructor(s)
0089:            // 3. Request handling
0090:            // 4. Transaction handling
0091:            // 5. Backend management
0092:            // 6. Debug/Monitoring
0093:            //
0094:
0095:            private CreateTablePolicy createTablePolicy;
0096:            protected static Trace logger = Trace
0097:                    .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb0");
0098:
0099:            /*
0100:             * Constructors
0101:             */
0102:
0103:            /**
0104:             * Creates a new RAIDb-0 request load balancer.
0105:             * 
0106:             * @param vdb the virtual database this load balancer belongs to.
0107:             * @param createTablePolicy the policy defining how 'create table' statements
0108:             *          should be handled
0109:             * @throws Exception if an error occurs
0110:             */
0111:            public RAIDb0(VirtualDatabase vdb,
0112:                    CreateTablePolicy createTablePolicy) throws Exception {
0113:                super (vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE);
0114:                this .createTablePolicy = createTablePolicy;
0115:                this .waitForCompletionPolicy = new WaitForCompletionPolicy(
0116:                        WaitForCompletionPolicy.ALL, false, 0);
0117:            }
0118:
0119:            /*
0120:             * Request Handling
0121:             */
0122:
0123:            /**
0124:             * Performs a read request on the backend that has the needed tables to
0125:             * executes the request.
0126:             * 
0127:             * @param request an <code>SelectRequest</code>
0128:             * @param metadataCache the metadataCache if any or null
0129:             * @return the corresponding <code>java.sql.ResultSet</code>
0130:             * @exception SQLException if an error occurs
0131:             * @see AbstractLoadBalancer#statementExecuteQuery(SelectRequest,
0132:             *      MetadataCache)
0133:             */
0134:            public ControllerResultSet statementExecuteQuery(
0135:                    SelectRequest request, MetadataCache metadataCache)
0136:                    throws SQLException {
0137:                try {
0138:                    vdb.acquireReadLockBackendLists(); // Acquire read lock
0139:                } catch (InterruptedException e) {
0140:                    String msg = Translate.get(
0141:                            "loadbalancer.backendlist.acquire.readlock.failed",
0142:                            e);
0143:                    logger.error(msg);
0144:                    throw new SQLException(msg);
0145:                }
0146:
0147:                try {
0148:                    ControllerResultSet rs = null;
0149:                    Collection fromTables = request.getFrom();
0150:
0151:                    if (fromTables == null)
0152:                        throw new SQLException(Translate.get(
0153:                                "loadbalancer.from.not.found", request
0154:                                        .getSqlShortForm(vdb
0155:                                                .getSqlShortFormLength())));
0156:
0157:                    // Find the backend that has the needed tables
0158:                    ArrayList backends = vdb.getBackends();
0159:                    int size = backends.size();
0160:                    int enabledBackends = 0;
0161:
0162:                    DatabaseBackend backend = null;
0163:                    // The backend that will execute the query
0164:                    for (int i = 0; i < size; i++) {
0165:                        backend = (DatabaseBackend) backends.get(i);
0166:                        if (backend.isReadEnabled())
0167:                            enabledBackends++;
0168:                        if (backend.isReadEnabled()
0169:                                && backend.hasTables(fromTables))
0170:                            break;
0171:                        else
0172:                            backend = null;
0173:                    }
0174:
0175:                    if (backend == null) {
0176:                        if (enabledBackends == 0)
0177:                            throw new NoMoreBackendException(Translate.get(
0178:                                    "loadbalancer.execute.no.backend.enabled",
0179:                                    request.getId()));
0180:                        else
0181:                            throw new SQLException(Translate.get(
0182:                                    "loadbalancer.backend.no.required.tables",
0183:                                    fromTables.toString()));
0184:                    }
0185:
0186:                    if (logger.isDebugEnabled()) {
0187:                        logger.debug("Backend " + backend.getName()
0188:                                + " has all tables which are:");
0189:                        for (Iterator iter = fromTables.iterator(); iter
0190:                                .hasNext();)
0191:                            logger.debug(iter.next());
0192:                    }
0193:
0194:                    // Execute the request on the chosen backend
0195:                    try {
0196:                        rs = executeRequestOnBackend(request, backend,
0197:                                metadataCache);
0198:                    } catch (SQLException se) {
0199:                        String msg = Translate.get(
0200:                                "loadbalancer.request.failed", new String[] {
0201:                                        String.valueOf(request.getId()),
0202:                                        se.getMessage() });
0203:                        if (logger.isInfoEnabled())
0204:                            logger.info(msg);
0205:                        throw new SQLException(msg);
0206:                    }
0207:
0208:                    return rs;
0209:                } catch (RuntimeException e) {
0210:                    String msg = Translate.get("loadbalancer.request.failed",
0211:                            new String[] {
0212:                                    request.getSqlShortForm(vdb
0213:                                            .getSqlShortFormLength()),
0214:                                    e.getMessage() });
0215:                    logger.fatal(msg, e);
0216:                    throw new SQLException(msg);
0217:                } finally {
0218:                    vdb.releaseReadLockBackendLists(); // Release the lock
0219:                }
0220:            }
0221:
0222:            /**
0223:             * Performs a write request on the backend that has the needed tables to
0224:             * executes the request.
0225:             * 
0226:             * @param request an <code>AbstractWriteRequest</code>
0227:             * @return number of rows affected by the request
0228:             * @exception SQLException if an error occurs
0229:             */
0230:            public ExecuteUpdateResult statementExecuteUpdate(
0231:                    AbstractWriteRequest request) throws SQLException {
0232:                // Handle macros
0233:                handleMacros(request);
0234:
0235:                try {
0236:                    vdb.acquireReadLockBackendLists(); // Acquire read lock
0237:                } catch (InterruptedException e) {
0238:                    String msg = Translate.get(
0239:                            "loadbalancer.backendlist.acquire.readlock.failed",
0240:                            e);
0241:                    logger.error(msg);
0242:                    throw new SQLException(msg);
0243:                }
0244:
0245:                boolean success = false;
0246:                try {
0247:                    // Log lazy begin if needed
0248:                    if (request.isLazyTransactionStart())
0249:                        this .vdb.getRequestManager().logLazyTransactionBegin(
0250:                                request.getTransactionId());
0251:
0252:                    // Log request
0253:                    if (recoveryLog != null)
0254:                        recoveryLog.logRequestExecuting(request);
0255:
0256:                    String table = request.getTableName();
0257:                    AbstractConnectionManager cm = null;
0258:
0259:                    if (table == null)
0260:                        throw new SQLException(Translate.get(
0261:                                "loadbalancer.request.target.table.not.found",
0262:                                request.getSqlShortForm(vdb
0263:                                        .getSqlShortFormLength())));
0264:
0265:                    // Find the backend that has the needed table
0266:                    ArrayList backends = vdb.getBackends();
0267:                    int size = backends.size();
0268:
0269:                    DatabaseBackend backend = null;
0270:                    // The backend that will execute the query
0271:                    if (request.isCreate()) { // Choose the backend according to the defined policy
0272:                        CreateTableRule rule = createTablePolicy
0273:                                .getTableRule(request.getTableName());
0274:                        if (rule == null)
0275:                            rule = createTablePolicy.getDefaultRule();
0276:
0277:                        // Ask the rule to pickup a backend
0278:                        ArrayList choosen;
0279:                        try {
0280:                            choosen = rule.getBackends(backends);
0281:                        } catch (CreateTableException e) {
0282:                            throw new SQLException(Translate.get(
0283:                                    "loadbalancer.create.table.rule.failed", e
0284:                                            .getMessage()));
0285:                        }
0286:
0287:                        // Get the connection manager from the chosen backend
0288:                        if (choosen != null)
0289:                            backend = (DatabaseBackend) choosen.get(0);
0290:                        if (backend != null)
0291:                            cm = backend.getConnectionManager(request
0292:                                    .getLogin());
0293:                    } else { // Find the backend that has the table
0294:                        for (int i = 0; i < size; i++) {
0295:                            backend = (DatabaseBackend) backends.get(i);
0296:                            if ((backend.isWriteEnabled() || backend
0297:                                    .isDisabling())
0298:                                    && backend.hasTable(table)) {
0299:                                cm = backend.getConnectionManager(request
0300:                                        .getLogin());
0301:                                break;
0302:                            }
0303:                        }
0304:                    }
0305:
0306:                    // Sanity check
0307:                    if (cm == null)
0308:                        throw new SQLException(Translate
0309:                                .get("loadbalancer.backend.no.required.table",
0310:                                        table));
0311:
0312:                    // Ok, let's execute the query
0313:
0314:                    if (request.isAutoCommit()) {
0315:                        // We do not execute request outside the already open transactions if we
0316:                        // are disabling the backend.
0317:                        if (!backend.canAcceptTasks(request))
0318:                            throw new SQLException(Translate.get(
0319:                                    "loadbalancer.backend.is.disabling",
0320:                                    new String[] {
0321:                                            request.getSqlShortForm(vdb
0322:                                                    .getSqlShortFormLength()),
0323:                                            backend.getName() }));
0324:
0325:                        // Use a connection just for this request
0326:                        PooledConnection c = null;
0327:                        try {
0328:                            c = cm.retrieveConnectionInAutoCommit(request);
0329:                        } catch (UnreachableBackendException e1) {
0330:                            logger
0331:                                    .error(Translate
0332:                                            .get(
0333:                                                    "loadbalancer.backend.disabling.unreachable",
0334:                                                    backend.getName()));
0335:                            disableBackend(backend, true);
0336:                            throw new SQLException(Translate.get(
0337:                                    "loadbalancer.backend.unreacheable",
0338:                                    backend.getName()));
0339:                        }
0340:
0341:                        // Sanity check
0342:                        if (c == null)
0343:                            throw new SQLException(Translate.get(
0344:                                    "loadbalancer.backend.no.connection",
0345:                                    backend.getName()));
0346:
0347:                        ExecuteUpdateResult result;
0348:                        try {
0349:                            result = executeStatementExecuteUpdateOnBackend(
0350:                                    request, backend, null, c);
0351:                        } catch (Exception e) {
0352:                            throw new SQLException(Translate.get(
0353:                                    "loadbalancer.request.failed",
0354:                                    new String[] {
0355:                                            request.getSqlShortForm(vdb
0356:                                                    .getSqlShortFormLength()),
0357:                                            e.getMessage() }));
0358:                        } finally {
0359:                            cm.releaseConnectionInAutoCommit(request, c);
0360:                        }
0361:                        if (logger.isDebugEnabled())
0362:                            logger.debug(Translate.get(
0363:                                    "loadbalancer.execute.on", new String[] {
0364:                                            String.valueOf(request.getId()),
0365:                                            backend.getName() }));
0366:                        return result;
0367:                    } else { // Inside a transaction
0368:                        Connection c;
0369:                        long tid = request.getTransactionId();
0370:
0371:                        try {
0372:                            c = backend
0373:                                    .getConnectionForTransactionAndLazyBeginIfNeeded(
0374:                                            request, cm);
0375:                        } catch (UnreachableBackendException e1) {
0376:                            logger
0377:                                    .error(Translate
0378:                                            .get(
0379:                                                    "loadbalancer.backend.disabling.unreachable",
0380:                                                    backend.getName()));
0381:                            disableBackend(backend, true);
0382:                            throw new SQLException(Translate.get(
0383:                                    "loadbalancer.backend.unreacheable",
0384:                                    backend.getName()));
0385:                        } catch (NoTransactionStartWhenDisablingException e) {
0386:                            String msg = Translate.get(
0387:                                    "loadbalancer.backend.is.disabling",
0388:                                    new String[] {
0389:                                            request.getSqlShortForm(vdb
0390:                                                    .getSqlShortFormLength()),
0391:                                            backend.getName() });
0392:                            logger.error(msg);
0393:                            throw new SQLException(msg);
0394:                        }
0395:
0396:                        // Sanity check
0397:                        if (c == null)
0398:                            throw new SQLException(Translate.get(
0399:                                    "loadbalancer.unable.retrieve.connection",
0400:                                    new String[] { String.valueOf(tid),
0401:                                            backend.getName() }));
0402:
0403:                        // Execute the query
0404:                        ExecuteUpdateResult result;
0405:                        try {
0406:                            result = executeStatementExecuteUpdateOnBackend(
0407:                                    request, backend, null,
0408:                                    cm.retrieveConnectionForTransaction(tid));
0409:                        } catch (Exception e) {
0410:                            throw new SQLException(Translate.get(
0411:                                    "loadbalancer.request.failed",
0412:                                    new String[] {
0413:                                            request.getSqlShortForm(vdb
0414:                                                    .getSqlShortFormLength()),
0415:                                            e.getMessage() }));
0416:                        }
0417:                        if (logger.isDebugEnabled())
0418:                            logger.debug(Translate.get(
0419:                                    "loadbalancer.execute.on", new String[] {
0420:                                            String.valueOf(request.getId()),
0421:                                            backend.getName() }));
0422:                        success = true;
0423:                        return result;
0424:                    }
0425:                } catch (RuntimeException e) {
0426:                    String msg = Translate.get("loadbalancer.request.failed",
0427:                            new String[] {
0428:                                    request.getSqlShortForm(vdb
0429:                                            .getSqlShortFormLength()),
0430:                                    e.getMessage() });
0431:                    logger.fatal(msg, e);
0432:                    throw new SQLException(msg);
0433:                } finally {
0434:                    vdb.releaseReadLockBackendLists(); // Release the lock
0435:                    if (!success)
0436:                        recoveryLog.logRequestCompletion(request.getLogId(),
0437:                                false, request.getExecTimeInMs());
0438:                }
0439:            }
0440:
0441:            /**
0442:             * @see AbstractLoadBalancer#statementExecuteUpdateWithKeys(AbstractWriteRequest,
0443:             *      MetadataCache)
0444:             */
0445:            public GeneratedKeysResult statementExecuteUpdateWithKeys(
0446:                    AbstractWriteRequest request, MetadataCache metadataCache)
0447:                    throws SQLException {
0448:                // Handle macros
0449:                handleMacros(request);
0450:
0451:                try {
0452:                    vdb.acquireReadLockBackendLists(); // Acquire
0453:                    // read
0454:                    // lock
0455:                } catch (InterruptedException e) {
0456:                    String msg = Translate.get(
0457:                            "loadbalancer.backendlist.acquire.readlock.failed",
0458:                            e);
0459:                    logger.error(msg);
0460:                    throw new SQLException(msg);
0461:                }
0462:
0463:                boolean success = false;
0464:                try {
0465:                    // Log lazy begin if needed
0466:                    if (request.isLazyTransactionStart())
0467:                        this .vdb.getRequestManager().logLazyTransactionBegin(
0468:                                request.getTransactionId());
0469:
0470:                    // Log request
0471:                    if (recoveryLog != null)
0472:                        recoveryLog.logRequestExecuting(request);
0473:
0474:                    String table = request.getTableName();
0475:                    AbstractConnectionManager cm = null;
0476:
0477:                    if (table == null)
0478:                        throw new SQLException(Translate.get(
0479:                                "loadbalancer.request.target.table.not.found",
0480:                                request.getSqlShortForm(vdb
0481:                                        .getSqlShortFormLength())));
0482:
0483:                    // Find the backend that has the needed table
0484:                    ArrayList backends = vdb.getBackends();
0485:                    int size = backends.size();
0486:
0487:                    DatabaseBackend backend = null;
0488:                    // The backend that will execute the query
0489:                    if (request.isCreate()) { // Choose the backend according to the defined policy
0490:                        CreateTableRule rule = createTablePolicy
0491:                                .getTableRule(request.getTableName());
0492:                        if (rule == null)
0493:                            rule = createTablePolicy.getDefaultRule();
0494:
0495:                        // Ask the rule to pickup a backend
0496:                        ArrayList choosen;
0497:                        try {
0498:                            choosen = rule.getBackends(backends);
0499:                        } catch (CreateTableException e) {
0500:                            throw new SQLException(Translate.get(
0501:                                    "loadbalancer.create.table.rule.failed", e
0502:                                            .getMessage()));
0503:                        }
0504:
0505:                        // Get the connection manager from the chosen backend
0506:                        if (choosen != null)
0507:                            backend = (DatabaseBackend) choosen.get(0);
0508:                        if (backend != null)
0509:                            cm = backend.getConnectionManager(request
0510:                                    .getLogin());
0511:                    } else { // Find the backend that has the table
0512:                        for (int i = 0; i < size; i++) {
0513:                            backend = (DatabaseBackend) backends.get(i);
0514:                            if ((backend.isWriteEnabled() || backend
0515:                                    .isDisabling())
0516:                                    && backend.hasTable(table)) {
0517:                                cm = backend.getConnectionManager(request
0518:                                        .getLogin());
0519:                                break;
0520:                            }
0521:                        }
0522:                    }
0523:
0524:                    // Sanity check
0525:                    if (cm == null)
0526:                        throw new SQLException(Translate
0527:                                .get("loadbalancer.backend.no.required.table",
0528:                                        table));
0529:
0530:                    if (!backend.getDriverCompliance()
0531:                            .supportGetGeneratedKeys())
0532:                        throw new SQLException(
0533:                                Translate
0534:                                        .get(
0535:                                                "loadbalancer.backend.autogeneratedkeys.unsupported",
0536:                                                backend.getName()));
0537:
0538:                    // Ok, let's execute the query
0539:
0540:                    if (request.isAutoCommit()) {
0541:                        // We do not execute request outside the already open transactions if we
0542:                        // are disabling the backend.
0543:                        if (!backend.canAcceptTasks(request))
0544:                            throw new SQLException(Translate.get(
0545:                                    "loadbalancer.backend.is.disabling",
0546:                                    new String[] {
0547:                                            request.getSqlShortForm(vdb
0548:                                                    .getSqlShortFormLength()),
0549:                                            backend.getName() }));
0550:
0551:                        // Use a connection just for this request
0552:                        PooledConnection c = null;
0553:                        try {
0554:                            c = cm.retrieveConnectionInAutoCommit(request);
0555:                        } catch (UnreachableBackendException e1) {
0556:                            logger
0557:                                    .error(Translate
0558:                                            .get(
0559:                                                    "loadbalancer.backend.disabling.unreachable",
0560:                                                    backend.getName()));
0561:                            disableBackend(backend, true);
0562:                            throw new SQLException(Translate.get(
0563:                                    "loadbalancer.backend.unreacheable",
0564:                                    backend.getName()));
0565:                        }
0566:
0567:                        // Sanity check
0568:                        if (c == null)
0569:                            throw new SQLException(Translate.get(
0570:                                    "loadbalancer.backend.no.connection",
0571:                                    backend.getName()));
0572:
0573:                        // Execute Query
0574:                        GeneratedKeysResult result;
0575:                        try {
0576:                            result = executeStatementExecuteUpdateWithKeysOnBackend(
0577:                                    request, backend, null, c, metadataCache);
0578:                        } catch (Exception e) {
0579:                            throw new SQLException(Translate.get(
0580:                                    "loadbalancer.request.failed",
0581:                                    new String[] {
0582:                                            request.getSqlShortForm(vdb
0583:                                                    .getSqlShortFormLength()),
0584:                                            e.getMessage() }));
0585:                        } finally {
0586:                            backend.removePendingRequest(request);
0587:                            cm.releaseConnectionInAutoCommit(request, c);
0588:                        }
0589:                        if (logger.isDebugEnabled())
0590:                            logger.debug(Translate.get(
0591:                                    "loadbalancer.execute.on", new String[] {
0592:                                            String.valueOf(request.getId()),
0593:                                            backend.getName() }));
0594:                        return result;
0595:                    } else { // Inside a transaction
0596:                        Connection c;
0597:                        long tid = request.getTransactionId();
0598:
0599:                        try {
0600:                            c = backend
0601:                                    .getConnectionForTransactionAndLazyBeginIfNeeded(
0602:                                            request, cm);
0603:                        } catch (UnreachableBackendException e1) {
0604:                            logger
0605:                                    .error(Translate
0606:                                            .get(
0607:                                                    "loadbalancer.backend.disabling.unreachable",
0608:                                                    backend.getName()));
0609:                            disableBackend(backend, true);
0610:                            throw new SQLException(Translate.get(
0611:                                    "loadbalancer.backend.unreacheable",
0612:                                    backend.getName()));
0613:                        } catch (NoTransactionStartWhenDisablingException e) {
0614:                            String msg = Translate.get(
0615:                                    "loadbalancer.backend.is.disabling",
0616:                                    new String[] {
0617:                                            request.getSqlShortForm(vdb
0618:                                                    .getSqlShortFormLength()),
0619:                                            backend.getName() });
0620:                            logger.error(msg);
0621:                            throw new SQLException(msg);
0622:                        }
0623:
0624:                        // Sanity check
0625:                        if (c == null)
0626:                            throw new SQLException(Translate.get(
0627:                                    "loadbalancer.unable.retrieve.connection",
0628:                                    new String[] { String.valueOf(tid),
0629:                                            backend.getName() }));
0630:
0631:                        // Execute the query
0632:                        GeneratedKeysResult result;
0633:                        try {
0634:                            result = executeStatementExecuteUpdateWithKeysOnBackend(
0635:                                    request, backend, null,
0636:                                    cm.retrieveConnectionForTransaction(tid),
0637:                                    metadataCache);
0638:                        } catch (Exception e) {
0639:                            throw new SQLException(Translate.get(
0640:                                    "loadbalancer.request.failed",
0641:                                    new String[] {
0642:                                            request.getSqlShortForm(vdb
0643:                                                    .getSqlShortFormLength()),
0644:                                            e.getMessage() }));
0645:                        }
0646:                        if (logger.isDebugEnabled())
0647:                            logger.debug(Translate.get(
0648:                                    "loadbalancer.execute.on", new String[] {
0649:                                            String.valueOf(request.getId()),
0650:                                            backend.getName() }));
0651:                        success = true;
0652:                        return result;
0653:                    }
0654:                } catch (RuntimeException e) {
0655:                    String msg = Translate.get("loadbalancer.request.failed",
0656:                            new String[] {
0657:                                    request.getSqlShortForm(vdb
0658:                                            .getSqlShortFormLength()),
0659:                                    e.getMessage() });
0660:                    logger.fatal(msg, e);
0661:                    throw new SQLException(msg);
0662:                } finally {
0663:                    vdb.releaseReadLockBackendLists(); // Release the lock
0664:                    if (!success)
0665:                        recoveryLog.logRequestCompletion(request.getLogId(),
0666:                                false, request.getExecTimeInMs());
0667:                }
0668:            }
0669:
0670:            /**
0671:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0672:             *      MetadataCache)
0673:             */
0674:            public ExecuteResult statementExecute(AbstractRequest request,
0675:                    MetadataCache metadataCache) throws SQLException,
0676:                    AllBackendsFailedException {
0677:                throw new NotImplementedException(
0678:                        "Statement.execute() is currently not supported with RAIDb-0");
0679:            }
0680:
0681:            /**
0682:             * Execute a read request on the selected backend.
0683:             * 
0684:             * @param request the request to execute
0685:             * @param backend the backend that will execute the request
0686:             * @param metadataCache the metadataCache if any or null
0687:             * @return the ControllerResultSet if (!success)
0688:             *         recoveryLog.logRequestCompletion(request.getLogId(), false,
0689:             *         request.getExecTimeInMs());
0690:             * @throws SQLException if an error occurs
0691:             */
0692:            protected ControllerResultSet executeRequestOnBackend(
0693:                    SelectRequest request, DatabaseBackend backend,
0694:                    MetadataCache metadataCache) throws SQLException {
0695:                // Handle macros
0696:                handleMacros(request);
0697:
0698:                // Ok, we have a backend, let's execute the request
0699:                AbstractConnectionManager cm = backend
0700:                        .getConnectionManager(request.getLogin());
0701:
0702:                // Sanity check
0703:                if (cm == null) {
0704:                    String msg = Translate.get(
0705:                            "loadbalancer.connectionmanager.not.found",
0706:                            new String[] { request.getLogin(),
0707:                                    backend.getName() });
0708:                    logger.error(msg);
0709:                    throw new SQLException(msg);
0710:                }
0711:
0712:                // Execute the query
0713:                if (request.isAutoCommit()) {
0714:                    ControllerResultSet rs = null;
0715:                    boolean badConnection;
0716:                    do {
0717:                        badConnection = false;
0718:                        // Use a connection just for this request
0719:                        PooledConnection c = null;
0720:                        try {
0721:                            c = cm.retrieveConnectionInAutoCommit(request);
0722:                        } catch (UnreachableBackendException e1) {
0723:                            logger
0724:                                    .error(Translate
0725:                                            .get(
0726:                                                    "loadbalancer.backend.disabling.unreachable",
0727:                                                    backend.getName()));
0728:                            disableBackend(backend, true);
0729:                            throw new SQLException(Translate.get(
0730:                                    "loadbalancer.backend.unreacheable",
0731:                                    backend.getName()));
0732:                        }
0733:
0734:                        // Sanity check
0735:                        if (c == null)
0736:                            throw new SQLException(Translate.get(
0737:                                    "loadbalancer.backend.no.connection",
0738:                                    backend.getName()));
0739:
0740:                        // Execute Query
0741:                        try {
0742:                            rs = executeStatementExecuteQueryOnBackend(request,
0743:                                    backend, null, c.getConnection(),
0744:                                    metadataCache);
0745:                            cm.releaseConnectionInAutoCommit(request, c);
0746:                        } catch (BadConnectionException e) { // Get rid of the bad connection
0747:                            cm.deleteConnection(c);
0748:                            badConnection = true;
0749:                        } catch (Throwable e) {
0750:                            cm.releaseConnectionInAutoCommit(request, c);
0751:                            throw new SQLException(
0752:                                    Translate
0753:                                            .get(
0754:                                                    "loadbalancer.request.failed.on.backend",
0755:                                                    new String[] {
0756:                                                            request
0757:                                                                    .getSqlShortForm(vdb
0758:                                                                            .getSqlShortFormLength()),
0759:                                                            backend.getName(),
0760:                                                            e.getMessage() }));
0761:                        }
0762:                    } while (badConnection);
0763:                    if (logger.isDebugEnabled())
0764:                        logger.debug(Translate.get("loadbalancer.execute.on",
0765:                                new String[] { String.valueOf(request.getId()),
0766:                                        backend.getName() }));
0767:                    return rs;
0768:                } else { // Inside a transaction
0769:                    Connection c;
0770:                    long tid = request.getTransactionId();
0771:
0772:                    try {
0773:                        c = backend
0774:                                .getConnectionForTransactionAndLazyBeginIfNeeded(
0775:                                        request, cm);
0776:                    } catch (UnreachableBackendException e1) {
0777:                        logger.error(Translate.get(
0778:                                "loadbalancer.backend.disabling.unreachable",
0779:                                backend.getName()));
0780:                        disableBackend(backend, true);
0781:                        throw new SQLException(Translate.get(
0782:                                "loadbalancer.backend.unreacheable", backend
0783:                                        .getName()));
0784:                    } catch (NoTransactionStartWhenDisablingException e) {
0785:                        String msg = Translate.get(
0786:                                "loadbalancer.backend.is.disabling",
0787:                                new String[] {
0788:                                        request.getSqlShortForm(vdb
0789:                                                .getSqlShortFormLength()),
0790:                                        backend.getName() });
0791:                        logger.error(msg);
0792:                        throw new SQLException(msg);
0793:                    }
0794:
0795:                    // Sanity check
0796:                    if (c == null)
0797:                        throw new SQLException(Translate.get(
0798:                                "loadbalancer.unable.retrieve.connection",
0799:                                new String[] { String.valueOf(tid),
0800:                                        backend.getName() }));
0801:
0802:                    // Execute Query
0803:                    ControllerResultSet rs = null;
0804:                    try {
0805:                        rs = executeStatementExecuteQueryOnBackend(request,
0806:                                backend, null, c, metadataCache);
0807:                    } catch (BadConnectionException e) { // Get rid of the bad connection
0808:                        cm.deleteConnection(tid);
0809:                        throw new SQLException(Translate.get(
0810:                                "loadbalancer.connection.failed", new String[] {
0811:                                        String.valueOf(tid), backend.getName(),
0812:                                        e.getMessage() }));
0813:                    } catch (Throwable e) {
0814:                        throw new SQLException(Translate.get(
0815:                                "loadbalancer.request.failed.on.backend",
0816:                                new String[] {
0817:                                        request.getSqlShortForm(vdb
0818:                                                .getSqlShortFormLength()),
0819:                                        backend.getName(), e.getMessage() }));
0820:                    }
0821:                    if (logger.isDebugEnabled())
0822:                        logger.debug(Translate.get(
0823:                                "loadbalancer.execute.transaction.on",
0824:                                new String[] { String.valueOf(tid),
0825:                                        String.valueOf(request.getId()),
0826:                                        backend.getName() }));
0827:                    return rs;
0828:                }
0829:            }
0830:
0831:            /**
0832:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
0833:             *      MetadataCache)
0834:             */
0835:            public ControllerResultSet readOnlyCallableStatementExecuteQuery(
0836:                    StoredProcedure proc, MetadataCache metadataCache)
0837:                    throws SQLException {
0838:                throw new SQLException(
0839:                        "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0840:            }
0841:
0842:            /**
0843:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
0844:             *      org.continuent.sequoia.controller.cache.metadata.MetadataCache)
0845:             */
0846:            public ExecuteResult readOnlyCallableStatementExecute(
0847:                    StoredProcedure proc, MetadataCache metadataCache)
0848:                    throws SQLException {
0849:                throw new SQLException(
0850:                        "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0851:            }
0852:
0853:            /**
0854:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0855:             *      MetadataCache)
0856:             */
0857:            public ControllerResultSet callableStatementExecuteQuery(
0858:                    StoredProcedure proc, MetadataCache metadataCache)
0859:                    throws SQLException {
0860:                throw new SQLException(
0861:                        "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0862:            }
0863:
0864:            /**
0865:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0866:             */
0867:            public ExecuteUpdateResult callableStatementExecuteUpdate(
0868:                    StoredProcedure proc) throws SQLException {
0869:                throw new SQLException(
0870:                        "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0871:            }
0872:
0873:            /**
0874:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
0875:             *      org.continuent.sequoia.controller.cache.metadata.MetadataCache)
0876:             */
0877:            public ExecuteResult callableStatementExecute(StoredProcedure proc,
0878:                    MetadataCache metadataCache)
0879:                    throws AllBackendsFailedException, SQLException {
0880:                throw new SQLException(
0881:                        "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0882:            }
0883:
0884:            /**
0885:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
0886:             */
0887:            public ControllerResultSet getPreparedStatementGetMetaData(
0888:                    AbstractRequest request) throws SQLException {
0889:                // Choose a backend
0890:                try {
0891:                    vdb.acquireReadLockBackendLists();
0892:                } catch (InterruptedException e) {
0893:                    String msg = Translate.get(
0894:                            "loadbalancer.backendlist.acquire.readlock.failed",
0895:                            e);
0896:                    logger.error(msg);
0897:                    throw new SQLException(msg);
0898:                }
0899:
0900:                /*
0901:                 * The backend that will execute the query
0902:                 */
0903:                DatabaseBackend backend = null;
0904:
0905:                // Note that vdb lock is released in the finally clause of this try/catch
0906:                // block
0907:                try {
0908:                    ArrayList backends = vdb.getBackends();
0909:                    int size = backends.size();
0910:
0911:                    if (size == 0)
0912:                        throw new NoMoreBackendException(Translate.get(
0913:                                "loadbalancer.execute.no.backend.available",
0914:                                request.getId()));
0915:
0916:                    // Choose the first available backend
0917:                    for (int i = 0; i < size; i++) {
0918:                        DatabaseBackend b = (DatabaseBackend) backends.get(i);
0919:                        if (b.isReadEnabled()) {
0920:                            backend = b;
0921:                            break;
0922:                        }
0923:                    }
0924:                } catch (Throwable e) {
0925:                    String msg = Translate.get(
0926:                            "loadbalancer.execute.find.backend.failed",
0927:                            new String[] {
0928:                                    request.getSqlShortForm(vdb
0929:                                            .getSqlShortFormLength()),
0930:                                    e.getMessage() });
0931:                    logger.error(msg, e);
0932:                    throw new SQLException(msg);
0933:                } finally {
0934:                    vdb.releaseReadLockBackendLists();
0935:                }
0936:
0937:                if (backend == null)
0938:                    throw new NoMoreBackendException(Translate.get(
0939:                            "loadbalancer.execute.no.backend.enabled", request
0940:                                    .getId()));
0941:
0942:                // Ok, we have a backend, let's execute the request
0943:                AbstractConnectionManager cm = backend
0944:                        .getConnectionManager(request.getLogin());
0945:
0946:                // Sanity check
0947:                if (cm == null) {
0948:                    String msg = Translate.get(
0949:                            "loadbalancer.connectionmanager.not.found",
0950:                            new String[] { request.getLogin(),
0951:                                    backend.getName() });
0952:                    logger.error(msg);
0953:                    throw new SQLException(msg);
0954:                }
0955:
0956:                // Execute the query
0957:                if (request.isAutoCommit()) {
0958:                    ControllerResultSet rs = null;
0959:                    boolean badConnection;
0960:                    do {
0961:                        badConnection = false;
0962:                        // Use a connection just for this request
0963:                        PooledConnection c = null;
0964:                        try {
0965:                            c = cm.retrieveConnectionInAutoCommit(request);
0966:                        } catch (UnreachableBackendException e1) {
0967:                            logger
0968:                                    .error(Translate
0969:                                            .get(
0970:                                                    "loadbalancer.backend.disabling.unreachable",
0971:                                                    backend.getName()));
0972:                            disableBackend(backend, true);
0973:                            // Retry on a different backend
0974:                            return getPreparedStatementGetMetaData(request);
0975:                        }
0976:
0977:                        // Sanity check
0978:                        if (c == null)
0979:                            throw new SQLException(Translate.get(
0980:                                    "loadbalancer.backend.no.connection",
0981:                                    backend.getName()));
0982:
0983:                        // Execute Query
0984:                        try {
0985:                            rs = preparedStatementGetMetaDataOnBackend(request
0986:                                    .getSqlOrTemplate(), backend, c
0987:                                    .getConnection());
0988:                            cm.releaseConnectionInAutoCommit(request, c);
0989:                        } catch (SQLException e) {
0990:                            cm.releaseConnectionInAutoCommit(request, c);
0991:                            throw SQLExceptionFactory
0992:                                    .getSQLException(
0993:                                            e,
0994:                                            Translate
0995:                                                    .get(
0996:                                                            "loadbalancer.request.failed.on.backend",
0997:                                                            new String[] {
0998:                                                                    request
0999:                                                                            .getSqlShortForm(vdb
1000:                                                                                    .getSqlShortFormLength()),
1001:                                                                    backend
1002:                                                                            .getName(),
1003:                                                                    e
1004:                                                                            .getMessage() }));
1005:                        } catch (BadConnectionException e) { // Get rid of the bad connection
1006:                            cm.deleteConnection(c);
1007:                            badConnection = true;
1008:                        } catch (Throwable e) {
1009:                            cm.releaseConnectionInAutoCommit(request, c);
1010:                            throw new SQLException(
1011:                                    Translate
1012:                                            .get(
1013:                                                    "loadbalancer.request.failed.on.backend",
1014:                                                    new String[] {
1015:                                                            request
1016:                                                                    .getSqlShortForm(vdb
1017:                                                                            .getSqlShortFormLength()),
1018:                                                            backend.getName(),
1019:                                                            e.getMessage() }));
1020:                        }
1021:                    } while (badConnection);
1022:                    if (logger.isDebugEnabled())
1023:                        logger.debug(Translate.get("loadbalancer.execute.on",
1024:                                new String[] { String.valueOf(request.getId()),
1025:                                        backend.getName() }));
1026:                    return rs;
1027:                } else { // Inside a transaction
1028:                    Connection c;
1029:                    long tid = request.getTransactionId();
1030:
1031:                    try {
1032:                        c = backend
1033:                                .getConnectionForTransactionAndLazyBeginIfNeeded(
1034:                                        request, cm);
1035:                    } catch (UnreachableBackendException e1) {
1036:                        logger.error(Translate.get(
1037:                                "loadbalancer.backend.disabling.unreachable",
1038:                                backend.getName()));
1039:                        disableBackend(backend, true);
1040:                        throw new SQLException(Translate.get(
1041:                                "loadbalancer.backend.unreacheable", backend
1042:                                        .getName()));
1043:                    } catch (NoTransactionStartWhenDisablingException e) {
1044:                        String msg = Translate.get(
1045:                                "loadbalancer.backend.is.disabling",
1046:                                new String[] {
1047:                                        request.getSqlShortForm(vdb
1048:                                                .getSqlShortFormLength()),
1049:                                        backend.getName() });
1050:                        logger.error(msg);
1051:                        throw new SQLException(msg);
1052:                    }
1053:
1054:                    // Sanity check
1055:                    if (c == null)
1056:                        throw new SQLException(Translate.get(
1057:                                "loadbalancer.unable.retrieve.connection",
1058:                                new String[] { String.valueOf(tid),
1059:                                        backend.getName() }));
1060:
1061:                    // Execute Query
1062:                    ControllerResultSet rs = null;
1063:                    try {
1064:                        rs = preparedStatementGetMetaDataOnBackend(request
1065:                                .getSqlOrTemplate(), backend, c);
1066:                    } catch (SQLException e) {
1067:                        throw e;
1068:                    } catch (BadConnectionException e) { // Connection failed, so did the transaction
1069:                        // Disable the backend.
1070:                        cm.deleteConnection(tid);
1071:                        String msg = Translate
1072:                                .get(
1073:                                        "loadbalancer.backend.disabling.connection.failure",
1074:                                        backend.getName());
1075:                        logger.error(msg);
1076:                        disableBackend(backend, true);
1077:                        throw new SQLException(msg);
1078:                    } catch (Throwable e) {
1079:                        throw new SQLException(Translate.get(
1080:                                "loadbalancer.request.failed.on.backend",
1081:                                new String[] {
1082:                                        request.getSqlShortForm(vdb
1083:                                                .getSqlShortFormLength()),
1084:                                        backend.getName(), e.getMessage() }));
1085:                    }
1086:                    if (logger.isDebugEnabled())
1087:                        logger.debug(Translate.get(
1088:                                "loadbalancer.execute.transaction.on",
1089:                                new String[] { String.valueOf(tid),
1090:                                        String.valueOf(request.getId()),
1091:                                        backend.getName() }));
1092:                    return rs;
1093:                }
1094:            }
1095:
1096:            /*
1097:             * Transaction management
1098:             */
1099:
1100:            /**
1101:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1102:             */
1103:            public void abort(TransactionMetaData tm) throws SQLException {
1104:                rollback(tm);
1105:            }
1106:
1107:            /**
1108:             * Begins a new transaction.
1109:             * 
1110:             * @param tm the transaction marker metadata
1111:             * @throws SQLException if an error occurs
1112:             */
1113:            public final void begin(TransactionMetaData tm) throws SQLException {
1114:            }
1115:
1116:            /**
1117:             * Commits a transaction.
1118:             * 
1119:             * @param tm the transaction marker metadata
1120:             * @throws SQLException if an error occurs
1121:             */
1122:            public void commit(TransactionMetaData tm) throws SQLException {
1123:                long tid = tm.getTransactionId();
1124:                Long lTid = new Long(tid);
1125:
1126:                long logId = 0;
1127:                // Log the request
1128:                if (recoveryLog != null)
1129:                    logId = recoveryLog.logCommit(tm);
1130:
1131:                // Acquire the lock
1132:                String requestDescription = "commit " + tid;
1133:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1134:                        requestDescription);
1135:
1136:                // Build the list of backends that need to commit this transaction
1137:                ArrayList commitList = new ArrayList(nbOfThreads);
1138:                for (int i = 0; i < nbOfThreads; i++) {
1139:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1140:                            .get(i);
1141:                    if (backend.isStartedTransaction(lTid))
1142:                        commitList.add(backend);
1143:                }
1144:
1145:                int nbOfThreadsToCommit = commitList.size();
1146:                CommitTask task = null;
1147:                if (nbOfThreadsToCommit != 0)
1148:                    task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1149:                            nbOfThreadsToCommit, tm);
1150:
1151:                // Post the task in the non-conflicting queues.
1152:                synchronized (enabledBackends) {
1153:                    for (int i = 0; i < nbOfThreadsToCommit; i++) {
1154:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1155:                                .get(i);
1156:                        backend.getTaskQueues()
1157:                                .addTaskToBackendTotalOrderQueue(task);
1158:                    }
1159:                }
1160:
1161:                // Release the lock
1162:                backendListLock.releaseRead();
1163:
1164:                // Check if someone had something to commit
1165:                if (task == null)
1166:                    return;
1167:
1168:                synchronized (task) {
1169:                    if (!task.hasCompleted())
1170:                        waitForTaskCompletion(tm.getTimeout(),
1171:                                requestDescription, task);
1172:
1173:                    if (task.getSuccess() == 0) { // All tasks failed
1174:
1175:                        // Notify failure in recovery log
1176:                        if (recoveryLog != null)
1177:                            recoveryLog.logRequestCompletion(logId, false, 0);
1178:
1179:                        List exceptions = task.getExceptions();
1180:                        if (exceptions == null)
1181:                            throw new SQLException(Translate.get(
1182:                                    "loadbalancer.commit.all.failed", tid));
1183:                        else {
1184:                            String errorMsg = Translate.get(
1185:                                    "loadbalancer.commit.failed.stack", tid)
1186:                                    + "\n";
1187:                            SQLException ex = SQLExceptionFactory
1188:                                    .getSQLException(exceptions, errorMsg);
1189:                            logger.error(ex.getMessage());
1190:                            throw ex;
1191:                        }
1192:                    }
1193:                }
1194:            }
1195:
1196:            /**
1197:             * Rollbacks a transaction.
1198:             * 
1199:             * @param tm the transaction marker metadata
1200:             * @throws SQLException if an error occurs
1201:             */
1202:            public void rollback(TransactionMetaData tm) throws SQLException {
1203:                long tid = tm.getTransactionId();
1204:                Long lTid = new Long(tid);
1205:
1206:                long logId = 0;
1207:                // Log the request
1208:                if (recoveryLog != null)
1209:                    logId = recoveryLog.logRollback(tm);
1210:
1211:                // Acquire the lock
1212:                String requestDescription = "rollback " + tid;
1213:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1214:                        requestDescription);
1215:
1216:                // Build the list of backends that need to rollback this transaction
1217:                ArrayList rollbackList = new ArrayList();
1218:                for (int i = 0; i < nbOfThreads; i++) {
1219:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1220:                            .get(i);
1221:                    if (backend.isStartedTransaction(lTid))
1222:                        rollbackList.add(backend);
1223:                }
1224:
1225:                int nbOfThreadsToRollback = rollbackList.size();
1226:                RollbackTask task = null;
1227:                if (nbOfThreadsToRollback != 0)
1228:                    task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1229:                            nbOfThreadsToRollback, tm);
1230:
1231:                // Post the task in the non-conflicting queues.
1232:                synchronized (enabledBackends) {
1233:                    for (int i = 0; i < nbOfThreadsToRollback; i++) {
1234:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1235:                                .get(i);
1236:                        backend.getTaskQueues()
1237:                                .addTaskToBackendTotalOrderQueue(task);
1238:                    }
1239:                }
1240:
1241:                // Release the lock
1242:                backendListLock.releaseRead();
1243:
1244:                // Check if someone had something to rollback
1245:                if (task == null)
1246:                    return;
1247:
1248:                synchronized (task) {
1249:                    if (!task.hasCompleted())
1250:                        waitForTaskCompletion(tm.getTimeout(),
1251:                                requestDescription, task);
1252:
1253:                    if (task.getSuccess() == 0) { // All tasks failed
1254:
1255:                        // Notify failure in recovery log
1256:                        if (recoveryLog != null)
1257:                            recoveryLog.logRequestCompletion(logId, false, 0);
1258:
1259:                        List exceptions = task.getExceptions();
1260:                        if (exceptions == null)
1261:                            throw new SQLException(Translate.get(
1262:                                    "loadbalancer.rollback.all.failed", tid));
1263:                        else {
1264:                            String errorMsg = Translate.get(
1265:                                    "loadbalancer.rollback.failed.stack", tid)
1266:                                    + "\n";
1267:                            SQLException ex = SQLExceptionFactory
1268:                                    .getSQLException(exceptions, errorMsg);
1269:                            logger.error(ex.getMessage());
1270:                            throw ex;
1271:                        }
1272:                    }
1273:                }
1274:            }
1275:
1276:            /**
1277:             * Rollback a transaction to a savepoint
1278:             * 
1279:             * @param tm The transaction marker metadata
1280:             * @param savepointName The name of the savepoint
1281:             * @throws SQLException if an error occurs
1282:             */
1283:            public void rollbackToSavepoint(TransactionMetaData tm,
1284:                    String savepointName) throws SQLException {
1285:                long tid = tm.getTransactionId();
1286:                Long lTid = new Long(tid);
1287:
1288:                long logId = 0;
1289:                // Log the request
1290:                if (recoveryLog != null)
1291:                    logId = recoveryLog.logRollbackToSavepoint(tm,
1292:                            savepointName);
1293:
1294:                // Acquire the lock
1295:                String requestDescription = "rollback " + savepointName + " "
1296:                        + tid;
1297:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1298:                        requestDescription);
1299:
1300:                // Build the list of backends that need to rollback this transaction
1301:                ArrayList rollbackList = new ArrayList();
1302:                for (int i = 0; i < nbOfThreads; i++) {
1303:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1304:                            .get(i);
1305:                    if (backend.isStartedTransaction(lTid))
1306:                        rollbackList.add(backend);
1307:                }
1308:
1309:                int nbOfThreadsToRollback = rollbackList.size();
1310:                RollbackToSavepointTask task = null;
1311:                if (nbOfThreadsToRollback != 0)
1312:                    task = new RollbackToSavepointTask(
1313:                            getNbToWait(nbOfThreadsToRollback),
1314:                            nbOfThreadsToRollback, tm, savepointName);
1315:
1316:                // Post the task in the non-conflicting queues.
1317:                synchronized (enabledBackends) {
1318:                    for (int i = 0; i < nbOfThreadsToRollback; i++) {
1319:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1320:                                .get(i);
1321:                        backend.getTaskQueues()
1322:                                .addTaskToBackendTotalOrderQueue(task);
1323:                    }
1324:                }
1325:
1326:                // Release the lock
1327:                backendListLock.releaseRead();
1328:
1329:                // Check if someone had something to rollback
1330:                if (task == null)
1331:                    return;
1332:
1333:                synchronized (task) {
1334:                    if (!task.hasCompleted())
1335:                        waitForTaskCompletion(tm.getTimeout(),
1336:                                requestDescription, task);
1337:
1338:                    if (task.getSuccess() == 0) { // All tasks failed
1339:
1340:                        // Notify failure in recovery log
1341:                        if (recoveryLog != null)
1342:                            recoveryLog.logRequestCompletion(logId, false, 0);
1343:
1344:                        List exceptions = task.getExceptions();
1345:                        if (exceptions == null)
1346:                            throw new SQLException(
1347:                                    Translate
1348:                                            .get(
1349:                                                    "loadbalancer.rollbacksavepoint.all.failed",
1350:                                                    new String[] {
1351:                                                            savepointName,
1352:                                                            String.valueOf(tid) }));
1353:                        else {
1354:                            String errorMsg = Translate
1355:                                    .get(
1356:                                            "loadbalancer.rollbacksavepoint.failed.stack",
1357:                                            new String[] { savepointName,
1358:                                                    String.valueOf(tid) })
1359:                                    + "\n";
1360:                            SQLException ex = SQLExceptionFactory
1361:                                    .getSQLException(exceptions, errorMsg);
1362:                            logger.error(ex.getMessage());
1363:                            throw ex;
1364:                        }
1365:                    }
1366:                }
1367:            }
1368:
1369:            /**
1370:             * Release a savepoint from a transaction
1371:             * 
1372:             * @param tm The transaction marker metadata
1373:             * @param savepointName The name of the savepoint ro release
1374:             * @throws SQLException if an error occurs
1375:             */
1376:            public void releaseSavepoint(TransactionMetaData tm,
1377:                    String savepointName) throws SQLException {
1378:                long tid = tm.getTransactionId();
1379:                Long lTid = new Long(tid);
1380:
1381:                long logId = 0;
1382:                // Log the request
1383:                if (recoveryLog != null)
1384:                    logId = recoveryLog.logReleaseSavepoint(tm, savepointName);
1385:
1386:                // Acquire the lock
1387:                String requestDescription = "release savepoint "
1388:                        + savepointName + " " + tid;
1389:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1390:                        requestDescription);
1391:
1392:                // Build the list of backends that need to rollback this transaction
1393:                ArrayList savepointList = new ArrayList();
1394:                for (int i = 0; i < nbOfThreads; i++) {
1395:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1396:                            .get(i);
1397:                    if (backend.isStartedTransaction(lTid))
1398:                        savepointList.add(backend);
1399:                }
1400:
1401:                int nbOfSavepoints = savepointList.size();
1402:                ReleaseSavepointTask task = null;
1403:                if (nbOfSavepoints != 0)
1404:                    task = new ReleaseSavepointTask(getNbToWait(nbOfThreads),
1405:                            nbOfThreads, tm, savepointName);
1406:
1407:                // Post the task in the non-conflicting queues.
1408:                synchronized (enabledBackends) {
1409:                    for (int i = 0; i < nbOfSavepoints; i++) {
1410:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1411:                                .get(i);
1412:                        backend.getTaskQueues()
1413:                                .addTaskToBackendTotalOrderQueue(task);
1414:                    }
1415:                }
1416:
1417:                // Release the lock
1418:                backendListLock.releaseRead();
1419:
1420:                // Check if someone had something to release
1421:                if (task == null)
1422:                    return;
1423:
1424:                synchronized (task) {
1425:                    if (!task.hasCompleted())
1426:                        waitForTaskCompletion(tm.getTimeout(),
1427:                                requestDescription, task);
1428:
1429:                    if (task.getSuccess() == 0) { // All tasks failed
1430:
1431:                        // Notify failure in recovery log
1432:                        if (recoveryLog != null)
1433:                            recoveryLog.logRequestCompletion(logId, false, 0);
1434:
1435:                        List exceptions = task.getExceptions();
1436:                        if (exceptions == null)
1437:                            throw new SQLException(Translate.get(
1438:                                    "loadbalancer.releasesavepoint.all.failed",
1439:                                    new String[] { savepointName,
1440:                                            String.valueOf(tid) }));
1441:                        else {
1442:                            String errorMsg = Translate
1443:                                    .get(
1444:                                            "loadbalancer.releasesavepoint.failed.stack",
1445:                                            new String[] { savepointName,
1446:                                                    String.valueOf(tid) })
1447:                                    + "\n";
1448:                            SQLException ex = SQLExceptionFactory
1449:                                    .getSQLException(exceptions, errorMsg);
1450:                            logger.error(ex.getMessage());
1451:                            throw ex;
1452:                        }
1453:                    }
1454:                }
1455:            }
1456:
1457:            /**
1458:             * Set a savepoint to a transaction.
1459:             * 
1460:             * @param tm The transaction marker metadata
1461:             * @param savepointName The name of the new savepoint
1462:             * @throws SQLException if an error occurs
1463:             */
1464:            public void setSavepoint(TransactionMetaData tm,
1465:                    String savepointName) throws SQLException {
1466:                long tid = tm.getTransactionId();
1467:                Long lTid = new Long(tid);
1468:
1469:                long logId = 0;
1470:                // Log the request
1471:                if (recoveryLog != null)
1472:                    logId = recoveryLog.logSetSavepoint(tm, savepointName);
1473:
1474:                // Acquire the lock
1475:                String requestDescription = "set savepoint " + savepointName
1476:                        + " " + tid;
1477:                int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1478:                        requestDescription);
1479:
1480:                // Build the list of backends that need to rollback this transaction
1481:                ArrayList savepointList = new ArrayList();
1482:                for (int i = 0; i < nbOfThreads; i++) {
1483:                    DatabaseBackend backend = (DatabaseBackend) enabledBackends
1484:                            .get(i);
1485:                    if (backend.isStartedTransaction(lTid))
1486:                        savepointList.add(backend);
1487:                }
1488:
1489:                int nbOfSavepoints = savepointList.size();
1490:                SavepointTask task = null;
1491:                if (nbOfSavepoints != 0)
1492:                    task = new SavepointTask(getNbToWait(nbOfThreads),
1493:                            nbOfThreads, tm, savepointName);
1494:
1495:                // Post the task in the non-conflicting queues.
1496:                synchronized (enabledBackends) {
1497:                    for (int i = 0; i < nbOfSavepoints; i++) {
1498:                        DatabaseBackend backend = (DatabaseBackend) enabledBackends
1499:                                .get(i);
1500:                        backend.getTaskQueues()
1501:                                .addTaskToBackendTotalOrderQueue(task);
1502:                    }
1503:                }
1504:
1505:                // Release the lock
1506:                backendListLock.releaseRead();
1507:
1508:                // Check if someone had something to release
1509:                if (task == null)
1510:                    return;
1511:
1512:                synchronized (task) {
1513:                    if (!task.hasCompleted())
1514:                        waitForTaskCompletion(tm.getTimeout(),
1515:                                requestDescription, task);
1516:
1517:                    if (task.getSuccess() == 0) { // All tasks failed
1518:
1519:                        // Notify failure in recovery log
1520:                        if (recoveryLog != null)
1521:                            recoveryLog.logRequestCompletion(logId, false, 0);
1522:
1523:                        List exceptions = task.getExceptions();
1524:                        if (exceptions == null)
1525:                            throw new SQLException(Translate.get(
1526:                                    "loadbalancer.setsavepoint.all.failed",
1527:                                    new String[] { savepointName,
1528:                                            String.valueOf(tid) }));
1529:                        else {
1530:                            String errorMsg = Translate.get(
1531:                                    "loadbalancer.setsavepoint.failed.stack",
1532:                                    new String[] { savepointName,
1533:                                            String.valueOf(tid) })
1534:                                    + "\n";
1535:                            SQLException ex = SQLExceptionFactory
1536:                                    .getSQLException(exceptions, errorMsg);
1537:                            logger.error(ex.getMessage());
1538:                            throw ex;
1539:                        }
1540:                    }
1541:                }
1542:            }
1543:
1544:            /**
1545:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1546:             *      long)
1547:             */
1548:            public void closePersistentConnection(String login,
1549:                    long persistentConnectionId) throws SQLException {
1550:                /*
1551:                 * We assume a synchronous execution and connection closing can only come
1552:                 * after all requests have been executed in that connection. We post to all
1553:                 * backends and let the task deal with whether that backend had a persistent
1554:                 * connection or not.
1555:                 */
1556:
1557:                String requestDescription = "closing persistent connection "
1558:                        + persistentConnectionId;
1559:                ClosePersistentConnectionTask task = null;
1560:                try {
1561:                    int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1562:                            requestDescription);
1563:
1564:                    task = new ClosePersistentConnectionTask(
1565:                            getNbToWait(nbOfThreads), nbOfThreads, login,
1566:                            persistentConnectionId);
1567:
1568:                    // Post the task in the non-conflicting queues.
1569:                    synchronized (enabledBackends) {
1570:                        for (int i = 0; i < nbOfThreads; i++) {
1571:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1572:                                    .get(i);
1573:                            backend.getTaskQueues()
1574:                                    .addTaskToBackendTotalOrderQueue(task);
1575:                        }
1576:                    }
1577:
1578:                    // Release the lock
1579:                    backendListLock.releaseRead();
1580:                } finally {
1581:                    if (task == null) { // Happens if we had a NoMoreBackendException
1582:                        task = new ClosePersistentConnectionTask(0, 0, login,
1583:                                persistentConnectionId);
1584:                    }
1585:                }
1586:            }
1587:
1588:            /**
1589:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1590:             *      long)
1591:             */
1592:            public void openPersistentConnection(String login,
1593:                    long persistentConnectionId) throws SQLException {
1594:                String requestDescription = "opening persistent connection "
1595:                        + persistentConnectionId;
1596:                int nbOfThreads = 0;
1597:                OpenPersistentConnectionTask task = null;
1598:                try {
1599:                    nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1600:                            requestDescription);
1601:
1602:                    task = new OpenPersistentConnectionTask(
1603:                            getNbToWait(nbOfThreads), nbOfThreads, login,
1604:                            persistentConnectionId);
1605:
1606:                    // Post the task in the non-conflicting queues.
1607:                    synchronized (enabledBackends) {
1608:                        for (int i = 0; i < nbOfThreads; i++) {
1609:                            DatabaseBackend backend = (DatabaseBackend) enabledBackends
1610:                                    .get(i);
1611:                            backend.getTaskQueues()
1612:                                    .addTaskToBackendTotalOrderQueue(task);
1613:                        }
1614:                    }
1615:
1616:                    // Release the lock
1617:                    backendListLock.releaseRead();
1618:
1619:                    synchronized (task) {
1620:                        if (!task.hasCompleted())
1621:                            try {
1622:                                waitForTaskCompletion(0, requestDescription,
1623:                                        task);
1624:                            } catch (SQLException ignore) {
1625:                            }
1626:                    }
1627:                } finally {
1628:                    if (logger.isDebugEnabled())
1629:                        logger.debug(requestDescription + " completed on "
1630:                                + nbOfThreads + " backends.");
1631:                }
1632:            }
1633:
1634:            /**
1635:             * Enables a Backend that was previously disabled.
1636:             * <p>
1637:             * Ask the corresponding connection manager to initialize the connections if
1638:             * needed.
1639:             * <p>
1640:             * No sanity checks are performed by this function.
1641:             * 
1642:             * @param db the database backend to enable
1643:             * @param writeEnabled True if the backend must be enabled for writes
1644:             * @throws SQLException if an error occurs
1645:             */
1646:            public void enableBackend(DatabaseBackend db, boolean writeEnabled)
1647:                    throws SQLException {
1648:                if (!db.isInitialized())
1649:                    db.initializeConnections();
1650:
1651:                if (writeEnabled && db.isWriteCanBeEnabled()) {
1652:                    // Create the new backend task queues
1653:                    db.setTaskQueues(new BackendTaskQueues(db,
1654:                            waitForCompletionPolicy, this .vdb
1655:                                    .getRequestManager()));
1656:                    db.startWorkerThreads(this );
1657:                    db.enableWrite();
1658:                }
1659:
1660:                db.enableRead();
1661:                try {
1662:                    backendListLock.acquireWrite();
1663:                } catch (InterruptedException e) {
1664:                    logger
1665:                            .error(
1666:                                    "Error while acquiring write lock in enableBackend",
1667:                                    e);
1668:                }
1669:                synchronized (enabledBackends) {
1670:                    enabledBackends.add(db);
1671:                }
1672:                backendListLock.releaseWrite();
1673:            }
1674:
1675:            /**
1676:             * Disables a backend that was previously enabled.
1677:             * <p>
1678:             * Ask the corresponding connection manager to finalize the connections if
1679:             * needed.
1680:             * <p>
1681:             * No sanity checks are performed by this function.
1682:             * 
1683:             * @param db the database backend to disable
1684:             * @param forceDisable true if disabling must be forced on the backend
1685:             * @throws SQLException if an error occurs
1686:             */
1687:            public void disableBackend(DatabaseBackend db, boolean forceDisable)
1688:                    throws SQLException {
1689:                if (!db.disable()) {
1690:                    // Another thread has already started the disable process
1691:                    return;
1692:                }
1693:                synchronized (this ) {
1694:                    if (forceDisable)
1695:                        db.shutdownConnectionManagers();
1696:                    db.terminateWorkerThreads();
1697:
1698:                    if (db.isInitialized())
1699:                        db.finalizeConnections();
1700:
1701:                    try {
1702:                        backendListLock.acquireWrite();
1703:                    } catch (InterruptedException e) {
1704:                        logger
1705:                                .error(
1706:                                        "Error while acquiring write lock in enableBackend",
1707:                                        e);
1708:                    }
1709:                    synchronized (enabledBackends) {
1710:                        enabledBackends.remove(db);
1711:                        if (enabledBackends.isEmpty()) {
1712:                            // Cleanup schema for any remaining locks
1713:                            this .vdb.getRequestManager().setDatabaseSchema(
1714:                                    null, false);
1715:                        }
1716:                    }
1717:                    backendListLock.releaseWrite();
1718:                }
1719:            }
1720:
1721:            /**
1722:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#setWeight(String,
1723:             *      int)
1724:             */
1725:            public void setWeight(String name, int w) throws SQLException {
1726:                throw new SQLException(
1727:                        "Weight is not supported with this load balancer");
1728:            }
1729:
1730:            /*
1731:             * Debug/Monitoring
1732:             */
1733:
1734:            /**
1735:             * Get information about the Request load balancer
1736:             * 
1737:             * @return <code>String</code> containing information
1738:             */
1739:            public String getInformation() {
1740:                return "RAIDb-0 Request load balancer\n";
1741:            }
1742:
1743:            /**
1744:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
1745:             */
1746:            public String getXmlImpl() {
1747:                StringBuffer info = new StringBuffer();
1748:                info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
1749:                createTablePolicy.getXml();
1750:                info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
1751:                return info.toString();
1752:            }
1753:
1754:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.