Source Code Cross Referenced for AbstractLoadBalancer.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » loadbalancer » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.loadbalancer 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2005-2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         * 
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         * 
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         * 
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License. 
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet.
0022:         * Contributor(s): Vadim Kassin, Jaco Swart, Jean-Bernard van Zuylen
0023:         */package org.continuent.sequoia.controller.loadbalancer;
0024:
0025:        import java.sql.CallableStatement;
0026:        import java.sql.Connection;
0027:        import java.sql.PreparedStatement;
0028:        import java.sql.ResultSet;
0029:        import java.sql.SQLException;
0030:        import java.sql.SQLWarning;
0031:        import java.sql.Statement;
0032:        import java.util.ArrayList;
0033:        import java.util.Iterator;
0034:        import java.util.LinkedList;
0035:        import java.util.List;
0036:
0037:        import org.continuent.sequoia.common.exceptions.BadConnectionException;
0038:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0039:        import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0040:        import org.continuent.sequoia.common.i18n.Translate;
0041:        import org.continuent.sequoia.common.locks.ReadPrioritaryFIFOWriteLock;
0042:        import org.continuent.sequoia.common.log.Trace;
0043:        import org.continuent.sequoia.common.protocol.PreparedStatementSerialization;
0044:        import org.continuent.sequoia.common.sql.filters.MacrosHandler;
0045:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0046:        import org.continuent.sequoia.common.xml.XmlComponent;
0047:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0048:        import org.continuent.sequoia.controller.backend.DriverCompliance;
0049:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0050:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0051:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0052:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0053:        import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0054:        import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0055:        import org.continuent.sequoia.controller.connection.PooledConnection;
0056:        import org.continuent.sequoia.controller.core.ControllerConstants;
0057:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0058:        import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0059:        import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0060:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0061:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0062:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0063:        import org.continuent.sequoia.controller.requests.CreateRequest;
0064:        import org.continuent.sequoia.controller.requests.SelectRequest;
0065:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0066:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0067:        import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendWritesMessage;
0068:
0069:        /**
0070:         * The Request Load Balancer should implement the load balancing of the requests
0071:         * among the backend nodes.
0072:         * <p>
0073:         * The requests comes from the Request Controller and are sent to the Connection
0074:         * Managers.
0075:         * 
0076:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0077:         * @author <a href="mailto:vadim@kase.kz">Vadim Kassin </a>
0078:         * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
0079:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0080:         *         </a>
0081:         * @version 1.0
0082:         */
0083:        public abstract class AbstractLoadBalancer implements  XmlComponent {
0084:
0085:            //
0086:            // How the code is organized ?
0087:            //
0088:            // 1. Member variables/Constructor
0089:            // 2. Getter/Setter (possibly in alphabetical order)
0090:            // 3. Request handling
0091:            // 4. Transaction management
0092:            // 5. Backend management
0093:            // 6. Debug/Monitoring
0094:            //
0095:
0096:            // Virtual Database this load balancer is attached to.
0097:            protected VirtualDatabase vdb;
0098:            protected RecoveryLog recoveryLog;
0099:            protected int raidbLevel;
0100:            protected int parsingGranularity;
0101:            /** Reference to distributed virtual database total order queue */
0102:            protected LinkedList totalOrderQueue;
0103:
0104:            protected MacrosHandler macroHandler;
0105:
0106:            /**
0107:             * List of enabled backends (includes backends in either ENABLED or DISABLING
0108:             * state).
0109:             * 
0110:             * @see org.continuent.sequoia.common.jmx.management.BackendState
0111:             */
0112:            protected ArrayList enabledBackends;
0113:            protected ReadPrioritaryFIFOWriteLock backendListLock = new ReadPrioritaryFIFOWriteLock();
0114:
0115:            /** Should we wait for all backends to commit before returning ? */
0116:            public WaitForCompletionPolicy waitForCompletionPolicy;
0117:
0118:            private static int defaultTransactionIsolationLevel;
0119:
0120:            protected static Trace logger = Trace
0121:                    .getLogger("org.continuent.sequoia.controller.loadbalancer");
0122:
0123:            /**
0124:             * Generic constructor that sets some member variables and checks that
0125:             * backends are in the disabled state
0126:             * 
0127:             * @param vdb The virtual database this load balancer belongs to
0128:             * @param raidbLevel The RAIDb level of this load balancer
0129:             * @param parsingGranularity The parsing granularity needed by this load
0130:             *          balancer
0131:             */
0132:            protected AbstractLoadBalancer(VirtualDatabase vdb, int raidbLevel,
0133:                    int parsingGranularity) throws SQLException {
0134:                this .raidbLevel = raidbLevel;
0135:                this .parsingGranularity = parsingGranularity;
0136:                this .vdb = vdb;
0137:                this .totalOrderQueue = vdb.getTotalOrderQueue();
0138:                this .enabledBackends = new ArrayList();
0139:                try {
0140:                    vdb.acquireReadLockBackendLists();
0141:                } catch (InterruptedException e) {
0142:                    String msg = Translate.get(
0143:                            "loadbalancer.backendlist.acquire.readlock.failed",
0144:                            e);
0145:                    logger.error(msg);
0146:                    throw new SQLException(msg);
0147:                }
0148:                int size = vdb.getBackends().size();
0149:                ArrayList backends = vdb.getBackends();
0150:                for (int i = 0; i < size; i++) {
0151:                    DatabaseBackend backend = (DatabaseBackend) backends.get(i);
0152:                    if (backend.isReadEnabled() || backend.isWriteEnabled()) {
0153:                        if (logger.isWarnEnabled())
0154:                            logger
0155:                                    .warn(Translate
0156:                                            .get(
0157:                                                    "loadbalancer.constructor.backends.not.disabled",
0158:                                                    backend.getName()));
0159:                        try {
0160:                            disableBackend(backend, true);
0161:                        } catch (Exception e) { // Set the disabled state anyway
0162:                            backend.disable();
0163:                        }
0164:                    }
0165:                }
0166:                vdb.releaseReadLockBackendLists();
0167:            }
0168:
0169:            //
0170:            // Getter/Setter methods
0171:            //
0172:
0173:            /**
0174:             * Returns the defaultTransactionIsolationLevel value.
0175:             * 
0176:             * @return Returns the defaultTransactionIsolationLevel.
0177:             */
0178:            public static final int getDefaultTransactionIsolationLevel() {
0179:                return defaultTransactionIsolationLevel;
0180:            }
0181:
0182:            /**
0183:             * Sets the defaultTransactionIsolationLevel value.
0184:             * 
0185:             * @param defaultTransactionIsolationLevel The
0186:             *          defaultTransactionIsolationLevel to set.
0187:             */
0188:            public final void setDefaultTransactionIsolationLevel(
0189:                    int defaultTransactionIsolationLevel) {
0190:                AbstractLoadBalancer.defaultTransactionIsolationLevel = defaultTransactionIsolationLevel;
0191:            }
0192:
0193:            /**
0194:             * This sets the macro handler for this load balancer. Handling macros
0195:             * prevents different backends to generate different values when interpreting
0196:             * the macros which could result in data inconsitencies.
0197:             * 
0198:             * @param handler <code>MacrosHandler</code> instance
0199:             */
0200:            public void setMacroHandler(MacrosHandler handler) {
0201:                this .macroHandler = handler;
0202:            }
0203:
0204:            /**
0205:             * Get the needed query parsing granularity.
0206:             * 
0207:             * @return needed query parsing granularity
0208:             */
0209:            public int getParsingGranularity() {
0210:                return parsingGranularity;
0211:            }
0212:
0213:            /**
0214:             * Returns the RAIDbLevel.
0215:             * 
0216:             * @return int the RAIDb level
0217:             */
0218:            public int getRAIDbLevel() {
0219:                return raidbLevel;
0220:            }
0221:
0222:            /**
0223:             * Returns the recoveryLog value.
0224:             * 
0225:             * @return Returns the recoveryLog.
0226:             */
0227:            public final RecoveryLog getRecoveryLog() {
0228:                return recoveryLog;
0229:            }
0230:
0231:            /**
0232:             * Sets the recoveryLog value.
0233:             * 
0234:             * @param recoveryLog The recoveryLog to set.
0235:             */
0236:            public final void setRecoveryLog(RecoveryLog recoveryLog) {
0237:                this .recoveryLog = recoveryLog;
0238:            }
0239:
0240:            /**
0241:             * Associate a weight to a backend identified by its logical name.
0242:             * 
0243:             * @param name the backend name
0244:             * @param w the weight
0245:             * @throws SQLException if an error occurs
0246:             */
0247:            public void setWeight(String name, int w) throws SQLException {
0248:                throw new SQLException(
0249:                        "Weight is not supported by this load balancer");
0250:            }
0251:
0252:            //
0253:            // Utility functions
0254:            //
0255:
0256:            /**
0257:             * Acquire the given lock and check the number of threads. Throw a
0258:             * NoMoreBackendException if no thread is available else returns the number of
0259:             * threads.
0260:             * 
0261:             * @param request object to remove from the total order queue in case no
0262:             *          backend is available
0263:             * @param requestDescription description of the request to put in the error
0264:             *          message in case of an error
0265:             * @return the number of threads in the acquired list
0266:             * @throws SQLException if there was a problem to acquire the lock on the
0267:             *           enabled backend list
0268:             * @throws NoMoreBackendException if no backends are available anymore
0269:             */
0270:            protected int acquireLockAndCheckNbOfThreads(Object request,
0271:                    String requestDescription) throws SQLException,
0272:                    NoMoreBackendException {
0273:                try {
0274:                    backendListLock.acquireRead();
0275:                } catch (InterruptedException e) {
0276:                    String msg = Translate.get(
0277:                            "loadbalancer.backendlist.acquire.readlock.failed",
0278:                            e);
0279:                    logger.error(msg);
0280:                    throw new SQLException(msg);
0281:                }
0282:
0283:                int nbOfThreads = enabledBackends.size();
0284:                if (nbOfThreads == 0) {
0285:                    releaseLockAndUnlockNextQuery(request);
0286:                    throw new NoMoreBackendException(Translate
0287:                            .get("loadbalancer.backendlist.empty"));
0288:                } else {
0289:                    if (logger.isDebugEnabled())
0290:                        logger.debug(Translate.get(
0291:                                "loadbalancer.execute.on.several",
0292:                                new String[] { requestDescription,
0293:                                        String.valueOf(nbOfThreads) }));
0294:                }
0295:                return nbOfThreads;
0296:            }
0297:
0298:            /**
0299:             * Returns the number of nodes to wait for according to the defined
0300:             * <code>waitForCompletion</code> policy.
0301:             * 
0302:             * @param nbOfThreads total number of threads
0303:             * @return int number of threads to wait for
0304:             */
0305:            protected int getNbToWait(int nbOfThreads) {
0306:                int nbToWait;
0307:                switch (waitForCompletionPolicy.getPolicy()) {
0308:                case WaitForCompletionPolicy.FIRST:
0309:                    nbToWait = 1;
0310:                    break;
0311:                case WaitForCompletionPolicy.MAJORITY:
0312:                    nbToWait = nbOfThreads / 2 + 1;
0313:                    break;
0314:                case WaitForCompletionPolicy.ALL:
0315:                    nbToWait = nbOfThreads;
0316:                    break;
0317:                default:
0318:                    logger.warn(Translate
0319:                            .get("loadbalancer.waitforcompletion.unsupported"));
0320:                    nbToWait = nbOfThreads;
0321:                    break;
0322:                }
0323:                return nbToWait;
0324:            }
0325:
0326:            /**
0327:             * Interprets the macros in the request (depending on the
0328:             * <code>MacroHandler</code> set for this class) and modify either the
0329:             * skeleton or the query itself. Note that the given object is directly
0330:             * modified.
0331:             * 
0332:             * @param request the request to process
0333:             */
0334:            public void handleMacros(AbstractRequest request) {
0335:                if (macroHandler == null)
0336:                    return;
0337:
0338:                // Do not handle macros for requests that don't need it.
0339:                if (!request.needsMacroProcessing())
0340:                    return;
0341:
0342:                macroHandler.processMacros(request);
0343:            }
0344:
0345:            /**
0346:             * Release the backend list lock and remove the current query from the head of
0347:             * the total order queue to unlock the next query.
0348:             * 
0349:             * @param currentQuery the current query to remove from the total order queue
0350:             */
0351:            protected void releaseLockAndUnlockNextQuery(Object currentQuery) {
0352:                backendListLock.releaseRead();
0353:
0354:                // Unblock next query from total order queue
0355:                removeObjectFromAndNotifyTotalOrderQueue(currentQuery);
0356:            }
0357:
0358:            /**
0359:             * Remove an entry of the total order queue (usually the head) and notify the
0360:             * queue so that the next queries can be scheduled.
0361:             * 
0362:             * @param request Object that should be removed from the total order queue
0363:             */
0364:            public void removeObjectFromAndNotifyTotalOrderQueue(Object request) {
0365:                if ((totalOrderQueue != null) && (request != null)) {
0366:                    synchronized (totalOrderQueue) {
0367:                        try {
0368:                            if (totalOrderQueue.remove(request)) {
0369:                                if (logger.isDebugEnabled())
0370:                                    logger.debug("Removed " + request
0371:                                            + " from total order queue");
0372:                                totalOrderQueue.notifyAll();
0373:                            } else if (logger.isDebugEnabled()) {
0374:                                logger.debug(request
0375:                                        + " was not in the total order queue");
0376:                            }
0377:                        } catch (RuntimeException e) {
0378:                            logger.warn("Unable to remove request " + request
0379:                                    + " from total order queue", e);
0380:                        }
0381:                    }
0382:                }
0383:            }
0384:
0385:            /**
0386:             * Wait for the completion of the given task. Note that this method must be
0387:             * called within a synchronized block on the task.
0388:             * 
0389:             * @param timeout timeout in ms for this task
0390:             * @param requestDescription description of the request to put in the error
0391:             *          message in case of a timeout
0392:             * @param task the task to wait for completion
0393:             * @throws SQLException if the timeout has expired
0394:             */
0395:            public static void waitForTaskCompletion(long timeout,
0396:                    String requestDescription, AbstractTask task)
0397:                    throws SQLException {
0398:                // Wait for completion (notified by the task)
0399:                try {
0400:                    // Wait on task
0401:                    if (timeout > 0) {
0402:                        long start = System.currentTimeMillis();
0403:                        task.wait(timeout);
0404:                        long end = System.currentTimeMillis();
0405:                        long remaining = timeout - (end - start);
0406:                        if (remaining <= 0) {
0407:                            if (task.setExpiredTimeout()) { // Task will be ignored by all backends
0408:                                String msg = Translate
0409:                                        .get("loadbalancer.request.timeout",
0410:                                                new String[] {
0411:                                                        requestDescription,
0412:                                                        String.valueOf(task
0413:                                                                .getSuccess()),
0414:                                                        String.valueOf(task
0415:                                                                .getFailed()) });
0416:
0417:                                logger.warn(msg);
0418:                                throw new SQLException(msg);
0419:                            }
0420:                            // else task execution already started, to late to cancel
0421:                        }
0422:                        // No need to update request timeout since the execution is finished
0423:                    } else
0424:                        task.wait();
0425:                } catch (InterruptedException e) {
0426:                    if (task.setExpiredTimeout()) { // Task will be ignored by all backends
0427:                        String msg = Translate.get(
0428:                                "loadbalancer.request.timeout", new String[] {
0429:                                        requestDescription,
0430:                                        String.valueOf(task.getSuccess()),
0431:                                        String.valueOf(task.getFailed()) });
0432:
0433:                        logger.warn(msg);
0434:                        throw new SQLException(msg);
0435:                    }
0436:                    // else task execution already started, to late to cancel
0437:                }
0438:            }
0439:
0440:            /**
0441:             * If we are executing in a distributed virtual database, we have to make sure
0442:             * that we post the query in the queue following the total order. This method
0443:             * does not remove the request from the total order queue. You have to call
0444:             * removeHeadFromAndNotifyTotalOrderQueue() to do so.
0445:             * 
0446:             * @param request the request to wait for (can be any object but usually a
0447:             *          DistributedRequest, Commit or Rollback)
0448:             * @param errorIfNotFound true if an error message should be logged if the
0449:             *          request is not found in the total order queue
0450:             * @return true if the element was found and wait has succeeded, false
0451:             *         otherwise
0452:             * @see #removeHeadFromAndNotifyTotalOrderQueue(Object)
0453:             */
0454:            public boolean waitForTotalOrder(Object request,
0455:                    boolean errorIfNotFound) {
0456:                if (totalOrderQueue != null) {
0457:                    synchronized (totalOrderQueue) {
0458:                        int index = totalOrderQueue.indexOf(request);
0459:                        while (index > 0) {
0460:                            if (logger.isDebugEnabled())
0461:                                logger.debug("Waiting for " + index
0462:                                        + " queries to execute (current is "
0463:                                        + totalOrderQueue.get(0) + ")");
0464:
0465:                            // All suspended requests can be bypassed
0466:                            boolean foundNonSuspendedRequest = false;
0467:                            for (int i = 0; i < index; i++) {
0468:                                if (!vdb.getRequestManager().getScheduler()
0469:                                        .isSuspendedRequest(
0470:                                                totalOrderQueue.get(i))) {
0471:                                    foundNonSuspendedRequest = true;
0472:                                    break;
0473:                                }
0474:                            }
0475:                            if (!foundNonSuspendedRequest) {
0476:                                index = 0;
0477:                                break;
0478:                            }
0479:
0480:                            try {
0481:                                totalOrderQueue.wait();
0482:                            } catch (InterruptedException ignore) {
0483:                            }
0484:                            index = totalOrderQueue.indexOf(request);
0485:                        }
0486:                        if (index == -1) {
0487:                            if (errorIfNotFound)
0488:                                logger
0489:                                        .error("Request was not found in total order queue, posting out of order ("
0490:                                                + request + ")");
0491:                            return false;
0492:                        } else
0493:                            return true;
0494:                    }
0495:                }
0496:                return false;
0497:            }
0498:
0499:            /**
0500:             * This will block the given request if there are any suspending task in
0501:             * progress (before the request in the total order queue).
0502:             * 
0503:             * @param request the request that we are processing
0504:             */
0505:            public void waitForSuspendWritesToComplete(AbstractRequest request) {
0506:                if (totalOrderQueue != null) {
0507:                    synchronized (totalOrderQueue) {
0508:                        boolean hasToWait = true;
0509:                        while (hasToWait) {
0510:                            hasToWait = false;
0511:                            // Checking total order queue to see if there is an
0512:                            // SuspendWritesMessage before this request.
0513:                            // If this is the case, this request will have to wait.
0514:                            for (Iterator iter = totalOrderQueue.iterator(); iter
0515:                                    .hasNext();) {
0516:                                Object elem = iter.next();
0517:                                if (elem instanceof  SuspendWritesMessage) {
0518:                                    // Found a SuspendWritesMessage, so wait...
0519:                                    hasToWait = true;
0520:                                    break;
0521:                                } else if (elem instanceof  AbstractRequest) {
0522:                                    // Found the request itself, let'go then...
0523:                                    AbstractRequest req = (AbstractRequest) elem;
0524:                                    if (req == request)
0525:                                        break;
0526:                                }
0527:                            }
0528:                            if (hasToWait)
0529:                                try {
0530:                                    totalOrderQueue.wait();
0531:                                } catch (InterruptedException ignore) {
0532:                                }
0533:                        }
0534:                    }
0535:                }
0536:            }
0537:
0538:            //
0539:            // Request Handling
0540:            //
0541:
0542:            /**
0543:             * Perform a read request. It is up to the implementation to choose to which
0544:             * backend node(s) this request should be sent.
0545:             * 
0546:             * @param request an <code>SelectRequest</code>
0547:             * @param metadataCache MetadataCache (null if none)
0548:             * @return the corresponding <code>ControllerResultSet</code>
0549:             * @exception SQLException if an error occurs
0550:             * @throws AllBackendsFailedException if all backends failed to execute the
0551:             *           request
0552:             */
0553:            public abstract ControllerResultSet statementExecuteQuery(
0554:                    SelectRequest request, MetadataCache metadataCache)
0555:                    throws SQLException, AllBackendsFailedException;
0556:
0557:            /**
0558:             * Perform a write request. This request should usually be broadcasted to all
0559:             * nodes.
0560:             * 
0561:             * @param request an <code>AbstractWriteRequest</code>
0562:             * @return number of rows affected by the request
0563:             * @throws AllBackendsFailedException if all backends failed to execute the
0564:             *           request
0565:             * @exception NoMoreBackendException if no backends are left to execute the
0566:             *              request
0567:             * @exception SQLException if an error occurs
0568:             */
0569:            public abstract ExecuteUpdateResult statementExecuteUpdate(
0570:                    AbstractWriteRequest request)
0571:                    throws AllBackendsFailedException, NoMoreBackendException,
0572:                    SQLException;
0573:
0574:            /**
0575:             * Perform a write request and return a ResultSet containing the auto
0576:             * generated keys.
0577:             * 
0578:             * @param request an <code>AbstractWriteRequest</code>
0579:             * @param metadataCache MetadataCache (null if none)
0580:             * @return auto generated keys
0581:             * @throws AllBackendsFailedException if all backends failed to execute the
0582:             *           request
0583:             * @exception NoMoreBackendException if no backends are left to execute the
0584:             *              request
0585:             * @exception SQLException if an error occurs
0586:             */
0587:            public abstract GeneratedKeysResult statementExecuteUpdateWithKeys(
0588:                    AbstractWriteRequest request, MetadataCache metadataCache)
0589:                    throws AllBackendsFailedException, NoMoreBackendException,
0590:                    SQLException;
0591:
0592:            /**
0593:             * Call a request that returns multiple results.
0594:             * 
0595:             * @param request the request to execute
0596:             * @param metadataCache MetadataCache (null if none)
0597:             * @return an <code>ExecuteResult</code> object
0598:             * @throws AllBackendsFailedException if all backends failed to execute the
0599:             *           request
0600:             * @throws SQLException if an error occurs
0601:             */
0602:            public abstract ExecuteResult statementExecute(
0603:                    AbstractRequest request, MetadataCache metadataCache)
0604:                    throws AllBackendsFailedException, SQLException;
0605:
0606:            /**
0607:             * Call a read-only stored procedure that returns a ResultSet. The stored
0608:             * procedure will be executed by one node only.
0609:             * 
0610:             * @param proc the stored procedure call
0611:             * @param metadataCache MetadataCache (null if none)
0612:             * @return a <code>ControllerResultSet</code> value
0613:             * @exception SQLException if an error occurs
0614:             */
0615:            public abstract ControllerResultSet readOnlyCallableStatementExecuteQuery(
0616:                    StoredProcedure proc, MetadataCache metadataCache)
0617:                    throws SQLException;
0618:
0619:            /**
0620:             * Call a read-only stored procedure that returns multiple results. The stored
0621:             * procedure will be executed by one node only.
0622:             * 
0623:             * @param proc the stored procedure call
0624:             * @param metadataCache MetadataCache (null if none)
0625:             * @return a <code>ExecuteResult</code> object containing all results
0626:             * @exception SQLException if an error occurs
0627:             */
0628:            public abstract ExecuteResult readOnlyCallableStatementExecute(
0629:                    StoredProcedure proc, MetadataCache metadataCache)
0630:                    throws SQLException;
0631:
0632:            /**
0633:             * Call a stored procedure that returns a ResultSet. This stored procedure can
0634:             * possibly perform writes and will therefore be executed by all nodes.
0635:             * 
0636:             * @param proc the stored procedure call
0637:             * @param metadataCache MetadataCache (null if none)
0638:             * @return a <code>ControllerResultSet</code> value
0639:             * @throws AllBackendsFailedException if all backends failed to execute the
0640:             *           request
0641:             * @exception SQLException if an error occurs
0642:             */
0643:            public abstract ControllerResultSet callableStatementExecuteQuery(
0644:                    StoredProcedure proc, MetadataCache metadataCache)
0645:                    throws AllBackendsFailedException, SQLException;
0646:
0647:            /**
0648:             * Call a stored procedure that performs an update.
0649:             * 
0650:             * @param proc the stored procedure call
0651:             * @return number of rows affected
0652:             * @throws AllBackendsFailedException if all backends failed to execute the
0653:             *           request
0654:             * @throws SQLException if an error occurs
0655:             */
0656:            public abstract ExecuteUpdateResult callableStatementExecuteUpdate(
0657:                    StoredProcedure proc) throws AllBackendsFailedException,
0658:                    SQLException;
0659:
0660:            /**
0661:             * Call a stored procedure that returns multiple results.
0662:             * 
0663:             * @param proc the stored procedure call
0664:             * @param metadataCache MetadataCache (null if none)
0665:             * @return an <code>ExecuteResult</code> object
0666:             * @throws AllBackendsFailedException if all backends failed to execute the
0667:             *           request
0668:             * @throws SQLException if an error occurs
0669:             */
0670:            public abstract ExecuteResult callableStatementExecute(
0671:                    StoredProcedure proc, MetadataCache metadataCache)
0672:                    throws AllBackendsFailedException, SQLException;
0673:
0674:            /**
0675:             * Return a ControllerResultSet containing the PreparedStatement metaData of
0676:             * the given sql template
0677:             * 
0678:             * @param request the request containing the sql template
0679:             * @return an empty ControllerResultSet with the metadata
0680:             * @throws SQLException if a database error occurs
0681:             */
0682:            public abstract ControllerResultSet getPreparedStatementGetMetaData(
0683:                    AbstractRequest request) throws SQLException;
0684:
0685:            /**
0686:             * Setup a Statement or a PreparedStatement (decoded if parameters of the
0687:             * request are not null).
0688:             */
0689:            private static Statement setupStatementOrPreparedStatement(
0690:                    AbstractRequest request, DatabaseBackend backend,
0691:                    BackendWorkerThread workerThread, Connection c,
0692:                    boolean setupResultSetParameters, boolean needGeneratedKeys)
0693:                    throws SQLException {
0694:                Statement s; // Can also be used as a PreparedStatement
0695:                if (request.getPreparedStatementParameters() == null)
0696:                    s = c.createStatement();
0697:                else {
0698:                    String rewrittenTemplate = backend.rewriteQuery(request
0699:                            .getSqlOrTemplate());
0700:                    if (needGeneratedKeys)
0701:                        s = c.prepareStatement(rewrittenTemplate,
0702:                                Statement.RETURN_GENERATED_KEYS);
0703:                    else
0704:                        s = c.prepareStatement(rewrittenTemplate);
0705:                    PreparedStatementSerialization.setPreparedStatement(request
0706:                            .getPreparedStatementParameters(),
0707:                            (PreparedStatement) s);
0708:                }
0709:
0710:                // Let the worker thread know which statement we are using in case there is
0711:                // a need to cancel that statement during its execution
0712:                if (workerThread != null)
0713:                    workerThread.setCurrentStatement(s);
0714:
0715:                DriverCompliance driverCompliance = backend
0716:                        .getDriverCompliance();
0717:                if (driverCompliance.supportSetQueryTimeout())
0718:                    s.setQueryTimeout(request.getTimeout());
0719:
0720:                if (setupResultSetParameters) {
0721:                    if ((request.getCursorName() != null)
0722:                            && (driverCompliance.supportSetCursorName()))
0723:                        s.setCursorName(request.getCursorName());
0724:                    if ((request.getFetchSize() != 0)
0725:                            && driverCompliance.supportSetFetchSize())
0726:                        s.setFetchSize(request.getFetchSize());
0727:                    if ((request.getMaxRows() > 0)
0728:                            && driverCompliance.supportSetMaxRows())
0729:                        s.setMaxRows(request.getMaxRows());
0730:                }
0731:                s.setEscapeProcessing(request.getEscapeProcessing());
0732:                return s;
0733:            }
0734:
0735:            /**
0736:             * Execute a statement on a backend. If the execution fails, the connection is
0737:             * checked for validity. If the connection was not valid, the query is
0738:             * automatically retried on a new connection.<br>
0739:             * 
0740:             * @param request the request to execute
0741:             * @param backend the backend on which the request is executed
0742:             * @param workerThread the backend worker thread executing this query (or null
0743:             *          if none)
0744:             * @param c connection used to create the statement
0745:             * @param metadataCache MetadataCache (null if none)
0746:             * @return the ControllerResultSet
0747:             * @throws SQLException if an error occurs
0748:             * @throws BadConnectionException if the connection was bad
0749:             * @throws UnreachableBackendException if the backend is unreachable
0750:             */
0751:            public static final ControllerResultSet executeStatementExecuteQueryOnBackend(
0752:                    SelectRequest request, DatabaseBackend backend,
0753:                    BackendWorkerThread workerThread, Connection c,
0754:                    MetadataCache metadataCache) throws SQLException,
0755:                    BadConnectionException, UnreachableBackendException {
0756:                ControllerResultSet rs = null;
0757:                ResultSet backendRS = null;
0758:                try {
0759:                    backend.addPendingReadRequest(request);
0760:
0761:                    Statement s = setupStatementOrPreparedStatement(request,
0762:                            backend, workerThread, c, true, false);
0763:
0764:                    // Execute the query
0765:                    if (request.getPreparedStatementParameters() == null)
0766:                        backendRS = s.executeQuery(backend.rewriteQuery(request
0767:                                .getSqlOrTemplate()));
0768:                    else
0769:                        backendRS = ((PreparedStatement) s).executeQuery();
0770:
0771:                    SQLWarning stWarns = null;
0772:                    if (request.getRetrieveSQLWarnings()) {
0773:                        stWarns = s.getWarnings();
0774:                    }
0775:                    rs = new ControllerResultSet(request, backendRS,
0776:                            metadataCache, s, false);
0777:                    rs.setStatementWarnings(stWarns);
0778:                } catch (SQLException e) { // Something bad happened
0779:                    /*
0780:                     * A persistent connection is a direct tie between the client program and
0781:                     * the backend database. If the connection is broken, it cannot be retried
0782:                     * on another connection on the same backend, because the client may be
0783:                     * depending on state that was associated with the broken connection.
0784:                     */
0785:                    if (backend.isValidConnection(c))
0786:                        throw e; // Connection is valid, throw the exception
0787:                    else if (request.isPersistentConnection())
0788:                        throw new UnreachableBackendException(
0789:                                "Bad persistent connection", e);
0790:                    else
0791:                        throw new BadConnectionException(e);
0792:                } finally {
0793:                    // we can close this resultset if fetch size was 0
0794:                    if (backendRS != null && request.getFetchSize() == 0)
0795:                        try {
0796:                            backendRS.close();
0797:                        } catch (SQLException ignore) {
0798:                        }
0799:                    backend.removePendingRequest(request);
0800:                }
0801:                return rs;
0802:            }
0803:
0804:            /**
0805:             * Execute an update prepared statement on a backend. If the execution fails,
0806:             * the connection is checked for validity. If the connection was not valid,
0807:             * the query is automatically retried on a new connection.
0808:             * 
0809:             * @param request the request to execute
0810:             * @param backend the backend on which the request is executed
0811:             * @param workerThread the backend worker thread executing this query (or null
0812:             *          if none)
0813:             * @param pc pooled connection used to create the statement
0814:             * @return int Number of rows effected
0815:             * @throws SQLException if an error occurs
0816:             * @throws BadConnectionException if the connection was bad
0817:             */
0818:            public static final ExecuteUpdateResult executeStatementExecuteUpdateOnBackend(
0819:                    AbstractWriteRequest request, DatabaseBackend backend,
0820:                    BackendWorkerThread workerThread, PooledConnection pc)
0821:                    throws SQLException, BadConnectionException {
0822:                Statement s = null;
0823:                Connection c = pc.getConnection();
0824:                try {
0825:                    backend.addPendingWriteRequest(request);
0826:
0827:                    s = setupStatementOrPreparedStatement(request, backend,
0828:                            workerThread, c, false, false);
0829:
0830:                    if (request.requiresConnectionFlush()) {
0831:                        pc.setMustBeRenewed(true);
0832:                    }
0833:
0834:                    // Execute the query
0835:                    ExecuteUpdateResult eur;
0836:                    if (request.getPreparedStatementParameters() == null)
0837:                        eur = new ExecuteUpdateResult(s.executeUpdate(backend
0838:                                .rewriteQuery(request.getSqlOrTemplate())));
0839:                    else
0840:                        eur = new ExecuteUpdateResult(((PreparedStatement) s)
0841:                                .executeUpdate());
0842:                    // get warnings, if required
0843:                    if (request.getRetrieveSQLWarnings())
0844:                        eur.setStatementWarnings(s.getWarnings());
0845:
0846:                    if (request.requiresConnectionPoolFlush())
0847:                        backend.flagAllConnectionsForRenewal();
0848:
0849:                    if (request instanceof  CreateRequest
0850:                            && ((CreateRequest) request)
0851:                                    .createsTemporaryTable())
0852:                        pc.addTemporaryTables(request.getTableName());
0853:                    return eur;
0854:                } catch (SQLException e) { // Something bad happened
0855:                    if (backend.isValidConnection(c))
0856:                        throw e; // Connection is valid, throw the exception
0857:                    else
0858:                        throw new BadConnectionException(e);
0859:                } finally {
0860:                    backend.removePendingRequest(request);
0861:                    try {
0862:                        if (s != null)
0863:                            s.close();
0864:                    } catch (SQLException ignore) {
0865:                    }
0866:                }
0867:            }
0868:
0869:            /**
0870:             * Execute an update prepared statement on a backend. If the execution fails,
0871:             * the connection is checked for validity. If the connection was not valid,
0872:             * the query is automatically retried on a new connection.
0873:             * 
0874:             * @param request the request to execute
0875:             * @param backend the backend on which the request is executed
0876:             * @param workerThread the backend worker thread executing this query (or null
0877:             *          if none)
0878:             * @param pc connection used to create the statement
0879:             * @param metadataCache MetadataCache (null if none)
0880:             * @return ControllerResultSet containing the auto-generated keys
0881:             * @throws SQLException if an error occurs
0882:             * @throws BadConnectionException if the connection was bad
0883:             */
0884:            public static final GeneratedKeysResult executeStatementExecuteUpdateWithKeysOnBackend(
0885:                    AbstractWriteRequest request, DatabaseBackend backend,
0886:                    BackendWorkerThread workerThread, PooledConnection pc,
0887:                    MetadataCache metadataCache) throws SQLException,
0888:                    BadConnectionException {
0889:                if (!backend.getDriverCompliance().supportGetGeneratedKeys())
0890:                    throw new SQLException("Backend " + backend.getName()
0891:                            + " does not support RETURN_GENERATED_KEYS");
0892:
0893:                Statement s = null;
0894:                Connection c = pc.getConnection();
0895:                try {
0896:                    backend.addPendingWriteRequest(request);
0897:
0898:                    s = setupStatementOrPreparedStatement(request, backend,
0899:                            workerThread, c, false, true);
0900:
0901:                    if (request.requiresConnectionFlush()) {
0902:                        pc.setMustBeRenewed(true);
0903:                    }
0904:                    // Execute the query
0905:                    int updateCount;
0906:                    if (request.getPreparedStatementParameters() == null)
0907:                        updateCount = s.executeUpdate(backend
0908:                                .rewriteQuery(request.getSqlOrTemplate()),
0909:                                Statement.RETURN_GENERATED_KEYS);
0910:                    else
0911:                        updateCount = ((PreparedStatement) s).executeUpdate();
0912:                    // get warnings, if required
0913:                    SQLWarning stWarns = null;
0914:                    if (request.getRetrieveSQLWarnings()) {
0915:                        stWarns = s.getWarnings();
0916:                    }
0917:                    ControllerResultSet rs = new ControllerResultSet(request, s
0918:                            .getGeneratedKeys(), metadataCache, s, false);
0919:                    GeneratedKeysResult gkr = new GeneratedKeysResult(rs,
0920:                            updateCount);
0921:                    gkr.setStatementWarnings(stWarns);
0922:
0923:                    if (request.requiresConnectionPoolFlush())
0924:                        backend.flagAllConnectionsForRenewal();
0925:
0926:                    return gkr;
0927:                } catch (SQLException e) { // Something bad happened
0928:                    if (backend.isValidConnection(c))
0929:                        throw e; // Connection is valid, throw the exception
0930:                    else
0931:                        throw new BadConnectionException(e);
0932:                } finally {
0933:                    backend.removePendingRequest(request);
0934:                    try {
0935:                        if (s != null)
0936:                            s.close();
0937:                    } catch (SQLException ignore) {
0938:                    }
0939:                }
0940:            }
0941:
0942:            /**
0943:             * Execute a request that returns multiple results on the given backend. The
0944:             * statement is setXXX if the driver has not processed the statement.
0945:             * 
0946:             * @param request the request to execute
0947:             * @param backend the backend on which to execute the stored procedure
0948:             * @param workerThread the backend worker thread executing this query (or null
0949:             *          if none)
0950:             * @param pc the connection on which to execute the stored procedure
0951:             * @param metadataCache the matedatacache to build the ControllerResultSet
0952:             * @return an <code>ExecuteResult</code> object
0953:             * @throws SQLException if an error occurs
0954:             * @throws BadConnectionException if the connection was bad
0955:             */
0956:            public static final ExecuteResult executeStatementExecuteOnBackend(
0957:                    AbstractRequest request, DatabaseBackend backend,
0958:                    BackendWorkerThread workerThread, PooledConnection pc,
0959:                    MetadataCache metadataCache) throws SQLException,
0960:                    BadConnectionException {
0961:                Statement s = null;
0962:                Connection c = pc.getConnection();
0963:                try {
0964:                    backend.addPendingWriteRequest(request);
0965:
0966:                    // Disable fetch size when using execute()
0967:                    request.setFetchSize(0);
0968:
0969:                    s = setupStatementOrPreparedStatement(request, backend,
0970:                            workerThread, c, true, false);
0971:
0972:                    if (request.requiresConnectionFlush()) {
0973:                        pc.setMustBeRenewed(true);
0974:                    }
0975:                    // Execute the query
0976:                    boolean hasResult;
0977:                    if (request.getPreparedStatementParameters() == null)
0978:                        hasResult = s.execute(backend.rewriteQuery(request
0979:                                .getSqlOrTemplate()));
0980:                    else
0981:                        hasResult = ((PreparedStatement) s).execute();
0982:
0983:                    int updatedRows = 0;
0984:                    // Process the result and get all ResultSets or udpate counts
0985:                    ExecuteResult result = new ExecuteResult();
0986:                    // get warnings, if required
0987:                    if (request.getRetrieveSQLWarnings())
0988:                        result.setStatementWarnings(s.getWarnings());
0989:                    int niter = 0;
0990:                    do {
0991:                        if (hasResult) {
0992:                            ControllerResultSet crs = new ControllerResultSet(
0993:                                    request, s.getResultSet(), metadataCache,
0994:                                    null, true);
0995:                            result.addResult(crs);
0996:                        } else {
0997:                            updatedRows = s.getUpdateCount();
0998:                            result.addResult(updatedRows);
0999:                        }
1000:                        hasResult = s.getMoreResults();
1001:                        niter++;
1002:                        logUnreasonableNumberOfIterations(niter);
1003:                    } while (hasResult || (updatedRows != -1));
1004:
1005:                    if (request.requiresConnectionPoolFlush())
1006:                        backend.flagAllConnectionsForRenewal();
1007:
1008:                    return result;
1009:                } catch (SQLException e) { // Something bad happened
1010:                    if (backend.isValidConnection(c))
1011:                        throw e; // Connection is valid, throw the exception
1012:                    else
1013:                        throw new BadConnectionException(e);
1014:                } finally {
1015:                    backend.removePendingRequest(request);
1016:                    try {
1017:                        if (s != null)
1018:                            s.close();
1019:                    } catch (SQLException ignore) {
1020:                    }
1021:                }
1022:            }
1023:
1024:            /**
1025:             * Fetch Out and Named parameters if any. The information about the parameters
1026:             * is found in the StoredProcedure object and the results are stored in the
1027:             * same structures.
1028:             * <p>
1029:             * After calling this method, the stored procedure object does not contain
1030:             * anymore the types of the parameters but their returned values.
1031:             * 
1032:             * @param cs callable statement to fetch from
1033:             * @param proc stored procedure object with parameters information
1034:             * @throws SQLException if an error occurs during fetching
1035:             */
1036:            private static void fetchOutAndNamedParameters(
1037:                    CallableStatement cs, StoredProcedure proc)
1038:                    throws SQLException {
1039:                // First fetch the out parameters
1040:                List outParamIndexes = proc.getOutParameterIndexes();
1041:                if (outParamIndexes != null) {
1042:                    for (Iterator iter = outParamIndexes.iterator(); iter
1043:                            .hasNext();) {
1044:                        Object index = iter.next();
1045:                        if (index instanceof  Integer)
1046:                            proc.setOutParameterValue(index, cs
1047:                                    .getObject(((Integer) index).intValue()));
1048:                        else
1049:                            // Named OUT parameter
1050:                            proc.setOutParameterValue(index, cs
1051:                                    .getObject((String) index));
1052:                    }
1053:                }
1054:
1055:                // Fetch the named parameters
1056:                List namedParamNames = proc.getNamedParameterNames();
1057:                if (namedParamNames != null) {
1058:                    for (Iterator iter = namedParamNames.iterator(); iter
1059:                            .hasNext();) {
1060:                        // Overwrite the type with the result (re-use the same map)
1061:                        String paramName = (String) iter.next();
1062:                        proc.setNamedParameterValue(paramName, cs
1063:                                .getObject(paramName));
1064:                    }
1065:                }
1066:            }
1067:
1068:            /**
1069:             * Setup a Statement or a PreparedStatement (decoded if a SQL template is
1070:             * found in the request).
1071:             */
1072:            private static CallableStatement setupCallableStatement(
1073:                    StoredProcedure proc, DatabaseBackend backend,
1074:                    BackendWorkerThread workerThread, Connection c,
1075:                    boolean setupResultSetParameters) throws SQLException {
1076:                CallableStatement cs; // Can also be used as a PreparedStatement
1077:                cs = c.prepareCall(backend
1078:                        .rewriteQuery(proc.getSqlOrTemplate()));
1079:                if (proc.getPreparedStatementParameters() != null)
1080:                    PreparedStatementSerialization.setCallableStatement(
1081:                            backend.rewriteQuery(proc
1082:                                    .getPreparedStatementParameters()), cs,
1083:                            proc);
1084:
1085:                // Let the worker thread know which statement we are using in case there is
1086:                // a need to cancel that statement during its execution
1087:                if (workerThread != null)
1088:                    workerThread.setCurrentStatement(cs);
1089:
1090:                DriverCompliance driverCompliance = backend
1091:                        .getDriverCompliance();
1092:                if (driverCompliance.supportSetQueryTimeout())
1093:                    cs.setQueryTimeout(proc.getTimeout());
1094:
1095:                if (setupResultSetParameters) {
1096:                    if ((proc.getCursorName() != null)
1097:                            && (driverCompliance.supportSetCursorName()))
1098:                        cs.setCursorName(proc.getCursorName());
1099:                    if ((proc.getFetchSize() != 0)
1100:                            && driverCompliance.supportSetFetchSize())
1101:                        cs.setFetchSize(proc.getFetchSize());
1102:                    if ((proc.getMaxRows() > 0)
1103:                            && driverCompliance.supportSetMaxRows())
1104:                        cs.setMaxRows(proc.getMaxRows());
1105:                }
1106:                cs.setEscapeProcessing(proc.getEscapeProcessing());
1107:                return cs;
1108:            }
1109:
1110:            /**
1111:             * Execute a read stored procedure on the given backend. The callable
1112:             * statement is setXXX if the driver has not processed the statement.<br>
1113:             * 
1114:             * @param proc the stored procedure to execute
1115:             * @param backend the backend on which to execute the stored procedure
1116:             * @param workerThread the backend worker thread executing this query (or null
1117:             *          if none)
1118:             * @param c the connection on which to execute the stored procedure
1119:             * @param metadataCache the matedatacache to build the ControllerResultSet
1120:             * @return the controllerResultSet
1121:             * @throws SQLException if an error occurs
1122:             * @throws BadConnectionException if the connection was bad
1123:             */
1124:            public static final ControllerResultSet executeCallableStatementExecuteQueryOnBackend(
1125:                    StoredProcedure proc, DatabaseBackend backend,
1126:                    BackendWorkerThread workerThread, Connection c,
1127:                    MetadataCache metadataCache) throws SQLException,
1128:                    BadConnectionException {
1129:                CallableStatement cs = null;
1130:                ResultSet backendRS = null;
1131:                try {
1132:                    backend.addPendingReadRequest(proc);
1133:
1134:                    cs = setupCallableStatement(proc, backend, workerThread, c,
1135:                            true);
1136:
1137:                    // Execute the query
1138:                    backendRS = cs.executeQuery();
1139:
1140:                    SQLWarning stWarns = null;
1141:                    if (proc.getRetrieveSQLWarnings()) {
1142:                        stWarns = cs.getWarnings();
1143:                    }
1144:                    ControllerResultSet rs = new ControllerResultSet(proc,
1145:                            backendRS, metadataCache, cs, false);
1146:                    rs.setStatementWarnings(stWarns);
1147:                    fetchOutAndNamedParameters(cs, proc);
1148:
1149:                    if (proc.requiresConnectionPoolFlush())
1150:                        backend.flagAllConnectionsForRenewal();
1151:
1152:                    return rs;
1153:                } catch (SQLException e) { // Something bad happened
1154:                    if (backend.isValidConnection(c))
1155:                        throw e; // Connection is valid, throw the exception
1156:                    else
1157:                        throw new BadConnectionException(e);
1158:                } finally {
1159:                    // we can close this resultset if fetch size was 0
1160:                    if (backendRS != null && proc.getFetchSize() == 0) {
1161:                        try {
1162:                            backendRS.close();
1163:                        } catch (SQLException ignore) {
1164:                        }
1165:                    }
1166:                    backend.removePendingRequest(proc);
1167:                }
1168:            }
1169:
1170:            /**
1171:             * Execute a write stored procedure on the given backend. The callable
1172:             * statement is setXXX if the driver has not processed the statement.
1173:             * 
1174:             * @param proc the stored procedure to execute
1175:             * @param backend the backend on which to execute the stored procedure
1176:             * @param workerThread the backend worker thread executing this query (or null
1177:             *          if none)
1178:             * @param c the connection on which to execute the stored procedure
1179:             * @return the number of updated rows
1180:             * @throws SQLException if an error occurs
1181:             * @throws BadConnectionException if the connection was bad
1182:             */
1183:            public static final ExecuteUpdateResult executeCallableStatementExecuteUpdateOnBackend(
1184:                    StoredProcedure proc, DatabaseBackend backend,
1185:                    BackendWorkerThread workerThread, PooledConnection pc)
1186:                    throws SQLException, BadConnectionException {
1187:                CallableStatement cs = null;
1188:                Connection c = pc.getConnection();
1189:                try {
1190:                    backend.addPendingWriteRequest(proc);
1191:
1192:                    cs = setupCallableStatement(proc, backend, workerThread, c,
1193:                            false);
1194:
1195:                    if (proc.requiresConnectionFlush()) {
1196:                        pc.setMustBeRenewed(true);
1197:                    }
1198:
1199:                    // Execute the query
1200:                    ExecuteUpdateResult eur = new ExecuteUpdateResult(cs
1201:                            .executeUpdate());
1202:                    // get warnings, if required
1203:                    if (proc.getRetrieveSQLWarnings())
1204:                        eur.setStatementWarnings(cs.getWarnings());
1205:
1206:                    fetchOutAndNamedParameters(cs, proc);
1207:
1208:                    if (proc.requiresConnectionPoolFlush())
1209:                        backend.flagAllConnectionsForRenewal();
1210:
1211:                    return eur;
1212:                } catch (SQLException e) { // Something bad happened
1213:                    if (backend.isValidConnection(c))
1214:                        throw e; // Connection is valid, throw the exception
1215:                    else
1216:                        throw new BadConnectionException(e);
1217:                } finally {
1218:                    backend.removePendingRequest(proc);
1219:                    try {
1220:                        if (cs != null)
1221:                            cs.close();
1222:                    } catch (SQLException ignore) {
1223:                    }
1224:                }
1225:            }
1226:
1227:            /**
1228:             * Execute a stored procedure that returns multiple results on the given
1229:             * backend. The callable statement is setXXX if the driver has not processed
1230:             * the statement.
1231:             * 
1232:             * @param proc the stored procedure to execute
1233:             * @param backend the backend on which to execute the stored procedure
1234:             * @param workerThread the backend worker thread executing this query (or null
1235:             *          if none)
1236:             * @param c the connection on which to execute the stored procedure
1237:             * @param metadataCache the matedatacache to build the ControllerResultSet
1238:             * @return an <code>ExecuteResult</code> object
1239:             * @throws SQLException if an error occurs
1240:             * @throws BadConnectionException if the connection was bad
1241:             */
1242:            public static final ExecuteResult executeCallableStatementExecuteOnBackend(
1243:                    StoredProcedure proc, DatabaseBackend backend,
1244:                    BackendWorkerThread workerThread, PooledConnection pc,
1245:                    MetadataCache metadataCache) throws SQLException,
1246:                    BadConnectionException {
1247:                CallableStatement cs = null;
1248:                Connection c = pc.getConnection();
1249:                try {
1250:                    backend.addPendingWriteRequest(proc);
1251:
1252:                    // Disable fetch size when using execute()
1253:                    proc.setFetchSize(0);
1254:
1255:                    cs = setupCallableStatement(proc, backend, workerThread, c,
1256:                            true);
1257:
1258:                    if (proc.requiresConnectionFlush()) {
1259:                        pc.setMustBeRenewed(true);
1260:                    }
1261:
1262:                    // Execute the query
1263:                    boolean hasResult = cs.execute();
1264:                    int updatedRows = 0;
1265:                    // Process the result and get all ResultSets or udpate counts
1266:                    ExecuteResult result = new ExecuteResult();
1267:                    // get warnings, if required
1268:                    if (proc.getRetrieveSQLWarnings())
1269:                        result.setStatementWarnings(cs.getWarnings());
1270:                    int niter = 0;
1271:                    do {
1272:                        if (hasResult) {
1273:                            ControllerResultSet crs = new ControllerResultSet(
1274:                                    proc, cs.getResultSet(), metadataCache,
1275:                                    null, true);
1276:                            result.addResult(crs);
1277:                        } else {
1278:                            updatedRows = cs.getUpdateCount();
1279:                            result.addResult(updatedRows);
1280:                        }
1281:                        if (updatedRows != -1)
1282:                            hasResult = cs.getMoreResults();
1283:
1284:                        niter++;
1285:                        logUnreasonableNumberOfIterations(niter);
1286:                    } while (hasResult || (updatedRows != -1));
1287:
1288:                    fetchOutAndNamedParameters(cs, proc);
1289:
1290:                    if (proc.requiresConnectionPoolFlush())
1291:                        backend.flagAllConnectionsForRenewal();
1292:
1293:                    return result;
1294:                } catch (SQLException e) { // Something bad happened
1295:                    if (backend.isValidConnection(c))
1296:                        throw e; // Connection is valid, throw the exception
1297:                    else
1298:                        throw new BadConnectionException(e);
1299:                } finally {
1300:                    backend.removePendingRequest(proc);
1301:                    try {
1302:                        if (cs != null)
1303:                            cs.close();
1304:                    } catch (SQLException ignore) {
1305:                    }
1306:                }
1307:            }
1308:
1309:            /**
1310:             * Sanity check to log buggy JDBC drivers triggering infinite loops. Logs at
1311:             * regular intervals. Warning: does log on zero, so please start at 1!
1312:             * 
1313:             * @param niter number to check
1314:             */
1315:            private static void logUnreasonableNumberOfIterations(int niter) {
1316:                if (niter % 1024 != 0)
1317:                    return;
1318:
1319:                // The time has come...
1320:                Throwable t = new Throwable(); // get a stacktrace
1321:                logger.warn(niter + " getMoreResults() iterations", t);
1322:            }
1323:
1324:            /**
1325:             * Get PreparedStatement metadata before the statement is executed.
1326:             * 
1327:             * @param sqlTemplate the PreparedStatement sql template
1328:             * @param backend the backend on which we execute the request
1329:             * @param c the connection to create the statement from
1330:             * @return an empty ResultSet with the associated metadata
1331:             * @throws SQLException if an error occurs
1332:             * @throws BadConnectionException if the databsae connection was bad
1333:             */
1334:            public static final ControllerResultSet preparedStatementGetMetaDataOnBackend(
1335:                    String sqlTemplate, DatabaseBackend backend, Connection c)
1336:                    throws SQLException, BadConnectionException {
1337:                try {
1338:                    PreparedStatement ps = c.prepareStatement(sqlTemplate);
1339:                    return new ControllerResultSet(
1340:                            ControllerConstants.CONTROLLER_FACTORY
1341:                                    .getResultSetMetaDataFactory()
1342:                                    .copyResultSetMetaData(ps.getMetaData(),
1343:                                            null), new ArrayList());
1344:                } catch (SQLException e) { // Something bad happened
1345:                    if (backend.isValidConnection(c))
1346:                        throw e; // Connection is valid, throw the exception
1347:                    else
1348:                        throw new BadConnectionException(e);
1349:                }
1350:            }
1351:
1352:            //
1353:            // Transaction management
1354:            //
1355:
1356:            /**
1357:             * Abort a transaction and all its currently pending or executing queries.
1358:             * 
1359:             * @param tm The transaction marker metadata
1360:             * @throws SQLException if an error occurs
1361:             */
1362:            public abstract void abort(TransactionMetaData tm)
1363:                    throws SQLException;
1364:
1365:            /**
1366:             * Begin a new transaction.
1367:             * 
1368:             * @param tm The transaction marker metadata
1369:             * @throws SQLException if an error occurs
1370:             */
1371:            public abstract void begin(TransactionMetaData tm)
1372:                    throws SQLException;
1373:
1374:            /**
1375:             * Commit a transaction.
1376:             * 
1377:             * @param tm The transaction marker metadata
1378:             * @throws AllBackendsFailedException if all backends failed to execute the
1379:             *           request
1380:             * @throws SQLException if an error occurs
1381:             */
1382:            public abstract void commit(TransactionMetaData tm)
1383:                    throws AllBackendsFailedException, SQLException;
1384:
1385:            /**
1386:             * Rollback a transaction.
1387:             * 
1388:             * @param tm The transaction marker metadata
1389:             * @throws AllBackendsFailedException if all backends failed to execute the
1390:             *           request
1391:             * @throws SQLException if an error occurs
1392:             */
1393:            public abstract void rollback(TransactionMetaData tm)
1394:                    throws AllBackendsFailedException, SQLException;
1395:
1396:            /**
1397:             * Rollback a transaction to a savepoint
1398:             * 
1399:             * @param tm The transaction marker metadata
1400:             * @param savepointName The name of the savepoint
1401:             * @throws AllBackendsFailedException if all backends failed to execute the
1402:             *           request
1403:             * @throws SQLException if an error occurs
1404:             */
1405:            public abstract void rollbackToSavepoint(TransactionMetaData tm,
1406:                    String savepointName) throws AllBackendsFailedException,
1407:                    SQLException;
1408:
1409:            /**
1410:             * Set a savepoint to a transaction.
1411:             * 
1412:             * @param tm The transaction marker metadata
1413:             * @param name The name of the new savepoint
1414:             * @throws AllBackendsFailedException if all backends failed to execute the
1415:             *           request
1416:             * @throws SQLException if an error occurs
1417:             */
1418:            public abstract void setSavepoint(TransactionMetaData tm,
1419:                    String name) throws AllBackendsFailedException,
1420:                    SQLException;
1421:
1422:            /**
1423:             * Release a savepoint from a transaction
1424:             * 
1425:             * @param tm The transaction marker metadata
1426:             * @param name The name of the savepoint ro release
1427:             * @throws AllBackendsFailedException if all backends failed to execute the
1428:             *           request
1429:             * @throws SQLException if an error occurs
1430:             */
1431:            public abstract void releaseSavepoint(TransactionMetaData tm,
1432:                    String name) throws AllBackendsFailedException,
1433:                    SQLException;
1434:
1435:            /**
1436:             * Close a persistent connection.
1437:             * 
1438:             * @param login login requesting the connection closing
1439:             * @param persistentConnectionId id of the persistent connection to close
1440:             * @throws SQLException if an error occurs
1441:             */
1442:            public abstract void closePersistentConnection(String login,
1443:                    long persistentConnectionId) throws SQLException;
1444:
1445:            /**
1446:             * Open a persistent connection.
1447:             * 
1448:             * @param login login requesting the connection closing
1449:             * @param persistentConnectionId id of the persistent connection to open
1450:             * @throws SQLException if an error occurs
1451:             */
1452:            public abstract void openPersistentConnection(String login,
1453:                    long persistentConnectionId) throws SQLException;
1454:
1455:            /**
1456:             * Factorized code to start a transaction on a backend and to retrieve a
1457:             * connection on this backend
1458:             * 
1459:             * @param backend the backend needed to check valid connection against this
1460:             *          backend test statement
1461:             * @param cm the connection manager to use to retrieve connections
1462:             * @param request request that will execute (must carry transaction id and
1463:             *          transaction isolation level (does nothing if equals to
1464:             *          Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL))
1465:             * @return a valid connection with a started transaction
1466:             * @throws SQLException if the backend is valid but set autocommit cannot be
1467:             *           set to false
1468:             * @throws UnreachableBackendException if the backend is not reachable, ie not
1469:             *           valid connection can be retrieved
1470:             * @see org.continuent.sequoia.driver.Connection#DEFAULT_TRANSACTION_ISOLATION_LEVEL
1471:             */
1472:            public static final Connection getConnectionAndBeginTransaction(
1473:                    DatabaseBackend backend, AbstractConnectionManager cm,
1474:                    AbstractRequest request) throws SQLException,
1475:                    UnreachableBackendException {
1476:                PooledConnection pc = null;
1477:                boolean isConnectionValid = false;
1478:                Connection c;
1479:
1480:                do {
1481:                    if (request.isPersistentConnection()) { // Retrieve the persistent connection and register it for the
1482:                        // transaction
1483:                        pc = cm.retrieveConnectionInAutoCommit(request);
1484:                        cm.registerConnectionForTransaction(pc, request
1485:                                .getTransactionId());
1486:                    } else { // Get a new connection for the transaction
1487:                        pc = cm.getConnectionForTransaction(request
1488:                                .getTransactionId());
1489:                    }
1490:
1491:                    // Sanity check
1492:                    if (pc == null) {
1493:                        throw new UnreachableBackendException(Translate.get(
1494:                                "loadbalancer.unable.get.connection",
1495:                                new String[] {
1496:                                        String.valueOf(request
1497:                                                .getTransactionId()),
1498:                                        backend.getName() }));
1499:                    }
1500:                    c = pc.getConnection();
1501:                    try {
1502:                        if (request.getTransactionIsolation() != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) {
1503:                            /*
1504:                             * A user specified transaction isolation will prevail on any other
1505:                             * settings
1506:                             */
1507:                            pc.setTransactionIsolation(request
1508:                                    .getTransactionIsolation());
1509:                        } else if (defaultTransactionIsolationLevel != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) {
1510:                            /*
1511:                             * The defaultTransactionIsolationLevel can be enforced in the
1512:                             * configuration file to force all transactions to use this level of
1513:                             * isolation
1514:                             */
1515:                            pc
1516:                                    .setTransactionIsolation(defaultTransactionIsolationLevel);
1517:                        }
1518:
1519:                        c.setAutoCommit(false);
1520:                        isConnectionValid = true;
1521:                    } catch (SQLException e) {
1522:                        if (backend.isValidConnection(c))
1523:                            throw e; // Connection is valid, throw the exception
1524:                        else {
1525:                            cm.deleteConnection(request.getTransactionId());
1526:                            if (request.isPersistentConnection()) {
1527:                                cm.deletePersistentConnection(request
1528:                                        .getPersistentConnectionId());
1529:                            }
1530:                        }
1531:                    }
1532:                } while (!isConnectionValid);
1533:                if (pc == null) {
1534:                    if (logger.isErrorEnabled()) {
1535:                        logger.error("Got a null connection [backend="
1536:                                + backend.getName() + ", tid="
1537:                                + request.getTransactionId() + "]");
1538:                    }
1539:                }
1540:                return c;
1541:            }
1542:
1543:            //
1544:            // Backends management
1545:            //
1546:
1547:            /**
1548:             * Enable a backend without further check. The backend is at least read
1549:             * enabled but could also be enabled for writes. Ask the corresponding
1550:             * connection manager to initialize the connections if needed.
1551:             * 
1552:             * @param db The database backend to enable
1553:             * @param writeEnabled True if the backend must be enabled for writes
1554:             * @throws SQLException if an error occurs
1555:             */
1556:            public abstract void enableBackend(DatabaseBackend db,
1557:                    boolean writeEnabled) throws SQLException;
1558:
1559:            /**
1560:             * Disable a backend without further check. Ask the corresponding connection
1561:             * manager to finalize the connections if needed. This method should not be
1562:             * called directly but instead should access the
1563:             * <code>RequestManager.disableBackend(...)</code> method.
1564:             * 
1565:             * @param db The database backend to disable
1566:             * @param forceDisable true if disable must be forced
1567:             * @throws SQLException if an error occurs
1568:             */
1569:            public abstract void disableBackend(DatabaseBackend db,
1570:                    boolean forceDisable) throws SQLException;
1571:
1572:            /**
1573:             * Get the number of currently enabled backends. 0 means that no backend is
1574:             * available.
1575:             * 
1576:             * @return number of currently enabled backends
1577:             */
1578:            public int getNumberOfEnabledBackends() {
1579:                return enabledBackends.size();
1580:            }
1581:
1582:            /**
1583:             * Get information about the Request Load Balancer
1584:             * 
1585:             * @return <code>String</code> containing information
1586:             */
1587:            public abstract String getInformation();
1588:
1589:            /**
1590:             * Get information about the Request Load Balancer in xml
1591:             * 
1592:             * @return <code>String</code> containing information, xml formatted
1593:             */
1594:            public abstract String getXmlImpl();
1595:
1596:            /**
1597:             * @see org.continuent.sequoia.common.xml.XmlComponent#getXml()
1598:             */
1599:            public String getXml() {
1600:                StringBuffer info = new StringBuffer();
1601:                info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + " "
1602:                        + DatabasesXmlTags.ATT_transactionIsolation + "=\"");
1603:                switch (defaultTransactionIsolationLevel) {
1604:                case Connection.TRANSACTION_READ_UNCOMMITTED:
1605:                    info.append(DatabasesXmlTags.VAL_readUncommitted);
1606:                    break;
1607:                case Connection.TRANSACTION_READ_COMMITTED:
1608:                    info.append(DatabasesXmlTags.VAL_readCommitted);
1609:                    break;
1610:                case Connection.TRANSACTION_REPEATABLE_READ:
1611:                    info.append(DatabasesXmlTags.VAL_repeatableRead);
1612:                    break;
1613:                case Connection.TRANSACTION_SERIALIZABLE:
1614:                    info.append(DatabasesXmlTags.VAL_serializable);
1615:                    break;
1616:                default:
1617:                    info.append(DatabasesXmlTags.VAL_databaseDefault);
1618:                    break;
1619:                }
1620:                info.append("\">");
1621:                info.append(getXmlImpl());
1622:                info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">");
1623:                return info.toString();
1624:            }
1625:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.