Source Code Cross Referenced for RequestManager.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » requestmanager » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.requestmanager 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2005-2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         *
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         *
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         *
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License.
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet.
0022:         * Contributor(s): Julie Marguerite, Greg Ward, Nicolas Modrzyk, Vadim Kassin,
0023:         *   Jean-Bernard van Zuylen, Peter Royal.
0024:         */package org.continuent.sequoia.controller.requestmanager;
0025:
0026:        import java.sql.SQLException;
0027:        import java.util.ArrayList;
0028:        import java.util.Date;
0029:        import java.util.Hashtable;
0030:        import java.util.Iterator;
0031:        import java.util.LinkedList;
0032:        import java.util.List;
0033:        import java.util.SortedSet;
0034:        import java.util.Vector;
0035:
0036:        import javax.management.NotCompliantMBeanException;
0037:
0038:        import org.continuent.sequoia.common.exceptions.BackupException;
0039:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0040:        import org.continuent.sequoia.common.exceptions.RollbackException;
0041:        import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0042:        import org.continuent.sequoia.common.i18n.Translate;
0043:        import org.continuent.sequoia.common.jmx.JmxConstants;
0044:        import org.continuent.sequoia.common.jmx.management.BackendInfo;
0045:        import org.continuent.sequoia.common.jmx.management.BackendState;
0046:        import org.continuent.sequoia.common.jmx.management.DumpInfo;
0047:        import org.continuent.sequoia.common.jmx.mbeans.RequestManagerMBean;
0048:        import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
0049:        import org.continuent.sequoia.common.log.Trace;
0050:        import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0051:        import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0052:        import org.continuent.sequoia.common.sql.schema.DatabaseTable;
0053:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0054:        import org.continuent.sequoia.common.xml.XmlComponent;
0055:        import org.continuent.sequoia.controller.backend.BackendStateListener;
0056:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0057:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0058:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0059:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0060:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0061:        import org.continuent.sequoia.controller.backup.BackupManager;
0062:        import org.continuent.sequoia.controller.backup.Backuper;
0063:        import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0064:        import org.continuent.sequoia.controller.cache.parsing.ParsingCache;
0065:        import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
0066:        import org.continuent.sequoia.controller.cache.result.entries.AbstractResultCacheEntry;
0067:        import org.continuent.sequoia.controller.core.ControllerConstants;
0068:        import org.continuent.sequoia.controller.jmx.AbstractStandardMBean;
0069:        import org.continuent.sequoia.controller.jmx.MBeanServerManager;
0070:        import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0071:        import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0072:        import org.continuent.sequoia.controller.loadbalancer.LoadBalancerControl;
0073:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0074:        import org.continuent.sequoia.controller.recoverylog.BackendRecoveryInfo;
0075:        import org.continuent.sequoia.controller.recoverylog.RecoverThread;
0076:        import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0077:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0078:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0079:        import org.continuent.sequoia.controller.requests.AlterRequest;
0080:        import org.continuent.sequoia.controller.requests.CreateRequest;
0081:        import org.continuent.sequoia.controller.requests.DropRequest;
0082:        import org.continuent.sequoia.controller.requests.ParsingGranularities;
0083:        import org.continuent.sequoia.controller.requests.RequestFactory;
0084:        import org.continuent.sequoia.controller.requests.RequestType;
0085:        import org.continuent.sequoia.controller.requests.SelectRequest;
0086:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0087:        import org.continuent.sequoia.controller.requests.UpdateRequest;
0088:        import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0089:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0090:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread;
0091:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0092:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0093:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0094:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0095:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0096:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0097:
0098:        /**
0099:         * This class defines the Request Manager.
0100:         * <p>
0101:         * The RM is composed of a Request Scheduler, an optional Query Cache, and a
0102:         * Load Balancer and an optional Recovery Log.
0103:         *
0104:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0105:         * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
0106:         * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
0107:         * @author <a href="mailto:vadim@kase.kz">Vadim Kassin </a>
0108:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0109:         *         </a>
0110:         * @version 1.0
0111:         */
0112:        public class RequestManager extends AbstractStandardMBean implements 
0113:                XmlComponent, RequestManagerMBean {
0114:
0115:            //
0116:            // How the code is organized ?
0117:            //
0118:            // 1. Member variables
0119:            // 2. Constructor(s)
0120:            // 3. Request handling
0121:            // 4. Transaction handling
0122:            // 5. Database backend management
0123:            // 6. Getter/Setter (possibly in alphabetical order)
0124:            //
0125:
0126:            /** begin timeout in ms */
0127:            protected long beginTimeout;
0128:
0129:            /** commit timeout in ms */
0130:            protected long commitTimeout;
0131:
0132:            /** rollback timeout in ms */
0133:            protected long rollbackTimeout;
0134:
0135:            /** The virtual database owning this Request Manager */
0136:            protected VirtualDatabase vdb;
0137:
0138:            /** The request scheduler to order and schedule requests */
0139:            protected AbstractScheduler scheduler;
0140:
0141:            /** An optional request cache to cache responses to SQL requests */
0142:            protected AbstractResultCache resultCache;
0143:
0144:            /** The request load balancer to use to send requests to the databases */
0145:            protected AbstractLoadBalancer loadBalancer;
0146:
0147:            /** An optional recovery log */
0148:            protected RecoveryLog recoveryLog;
0149:
0150:            /** The backup manager responsible for backup and restore of backends */
0151:            protected BackupManager backupManager;
0152:
0153:            // The virtual dabase schema
0154:            protected DatabaseSchema dbs;
0155:
0156:            /** <code>true</code> if schema is no more up-to-date and needs a refresh */
0157:            private boolean schemaIsDirty = false;
0158:
0159:            private boolean schemaIsStatic = false;
0160:
0161:            private boolean isCaseSensitiveParsing = false;
0162:
0163:            protected ParsingCache parsingCache = null;
0164:
0165:            private MetadataCache metadataCache = null;
0166:
0167:            // SQL queries parsing granularity according to Scheduler, ResultCache and
0168:            // LoadBalancer required granularity
0169:            protected int schedulerParsingranularity = ParsingGranularities.NO_PARSING;
0170:
0171:            private int cacheParsingranularity = ParsingGranularities.NO_PARSING;
0172:
0173:            private int loadBalancerParsingranularity = ParsingGranularities.NO_PARSING;
0174:
0175:            protected int requiredParsingGranularity = ParsingGranularities.NO_PARSING;
0176:
0177:            private long requestId = 0;
0178:
0179:            /* end user logger */
0180:            protected static Trace endUserLogger = Trace
0181:                    .getLogger("org.continuent.sequoia.enduser");
0182:
0183:            /**
0184:             * Hashtable&lt;Long, TransactionMetaData&gt;, the <code>Long</code> key is
0185:             * the same transaction ID as the <code>transactionId</code> field of its
0186:             * corresponding <code>TransactionMetaData</code> value.
0187:             */
0188:            // <pedantic>This Hashtable should be called transactionMetaData and the
0189:            // TransactionMetaData class should be renamed TransactionMetaDatum but
0190:            // 1/ I don't care
0191:            // 2/ I don't spell check my code :-)
0192:            // Jeff.
0193:            // </pedantic>
0194:            protected Hashtable transactionMetaDatas;
0195:
0196:            /**
0197:             * Hashtable&lt;Long, String&gt;, the <code>Long</code> being a transaction
0198:             * ID and its corresponding <code>String</code> being the name of a
0199:             * savepoint.
0200:             */
0201:            protected Hashtable tidSavepoints;
0202:
0203:            protected Trace logger = null;
0204:
0205:            private BackendStateListener backendStateListener;
0206:
0207:            //
0208:            // Constructors
0209:            //
0210:
0211:            /**
0212:             * Creates a new <code>RequestManager</code> instance.
0213:             *
0214:             * @param vdb the virtual database this request manager belongs to
0215:             * @param scheduler the Request Scheduler to use
0216:             * @param cache a Query Cache implementation
0217:             * @param loadBalancer the Request Load Balancer to use
0218:             * @param recoveryLog the Log Recovery to use
0219:             * @param beginTimeout timeout in seconds for begin
0220:             * @param commitTimeout timeout in seconds for commit
0221:             * @param rollbackTimeout timeout in seconds for rollback
0222:             * @throws SQLException if an error occurs
0223:             * @throws NotCompliantMBeanException if the MBean is not JMX compliant
0224:             */
0225:            public RequestManager(VirtualDatabase vdb,
0226:                    AbstractScheduler scheduler, AbstractResultCache cache,
0227:                    AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
0228:                    long beginTimeout, long commitTimeout, long rollbackTimeout)
0229:                    throws SQLException, NotCompliantMBeanException {
0230:                super (RequestManagerMBean.class);
0231:                this .vdb = vdb;
0232:                assignAndCheckSchedulerLoadBalancerValidity(scheduler,
0233:                        loadBalancer);
0234:                // requiredParsingGranularity is the maximum of each component granularity
0235:                this .resultCache = cache;
0236:                if (resultCache != null) {
0237:                    cacheParsingranularity = cache.getParsingGranularity();
0238:                    if (cacheParsingranularity > requiredParsingGranularity)
0239:                        requiredParsingGranularity = cacheParsingranularity;
0240:                }
0241:                setRecoveryLog(recoveryLog);
0242:                initRequestManagerVariables(vdb, beginTimeout, commitTimeout,
0243:                        rollbackTimeout);
0244:                logger.info(Translate.get("requestmanager.parsing.granularity",
0245:                        ParsingGranularities
0246:                                .getInformation(requiredParsingGranularity)));
0247:
0248:                if (MBeanServerManager.isJmxEnabled()) {
0249:                    try {
0250:                        MBeanServerManager.registerMBean(this , JmxConstants
0251:                                .getRequestManagerObjectName(vdb
0252:                                        .getVirtualDatabaseName()));
0253:
0254:                    } catch (Exception e) {
0255:                        logger
0256:                                .error(Translate
0257:                                        .get("jmx.failed.register.mbean.requestmanager"));
0258:                    }
0259:                }
0260:            }
0261:
0262:            /**
0263:             * Retrieve the last known checkpoint from the recovery log and set it for
0264:             * each backend.
0265:             */
0266:            public void initBackendsLastKnownCheckpointFromRecoveryLog() {
0267:                if (recoveryLog == null)
0268:                    return;
0269:                String databaseName = vdb.getVirtualDatabaseName();
0270:                ArrayList backends = vdb.getBackends();
0271:                int size = backends.size();
0272:                DatabaseBackend backend;
0273:                BackendRecoveryInfo info;
0274:                for (int i = 0; i < size; i++) {
0275:                    backend = (DatabaseBackend) backends.get(i);
0276:                    try {
0277:                        info = recoveryLog.getBackendRecoveryInfo(databaseName,
0278:                                backend.getName());
0279:                        String checkpoint = info.getCheckpoint();
0280:                        if ((checkpoint == null) || ("".equals(checkpoint))) { // No last known checkpoint
0281:                            if (info.getBackendState() != BackendState.UNKNOWN) {
0282:                                String msg = "Backend "
0283:                                        + backend.getName()
0284:                                        + " was not properly stopped, manual recovery will be needed ("
0285:                                        + info + ")";
0286:                                logger.warn(msg);
0287:                                throw new SQLException(msg);
0288:                            }
0289:                        }
0290:                        backend.setLastKnownCheckpoint(checkpoint);
0291:                    } catch (SQLException e) {
0292:                        if (logger.isDebugEnabled()) {
0293:                            logger.debug(e.getMessage(), e);
0294:                        }
0295:                        backend.setState(BackendState.UNKNOWN);
0296:                        // Also sets backend.setLastKnownCheckpoint(null);
0297:                    }
0298:                }
0299:            }
0300:
0301:            /**
0302:             * Check that Scheduler and Load Balancer are not null and have compatible
0303:             * RAIDb levels.
0304:             *
0305:             * @param aScheduler the scheduler to set
0306:             * @param aLoadBalancer the load balancer to set
0307:             * @throws SQLException if an error occurs
0308:             */
0309:            private void assignAndCheckSchedulerLoadBalancerValidity(
0310:                    AbstractScheduler aScheduler,
0311:                    AbstractLoadBalancer aLoadBalancer) throws SQLException {
0312:                if (aScheduler == null)
0313:                    throw new SQLException(Translate
0314:                            .get("requestmanager.null.scheduler"));
0315:
0316:                if (aLoadBalancer == null)
0317:                    throw new SQLException(Translate
0318:                            .get("requestmanager.null.loadbalancer"));
0319:
0320:                if (aScheduler.getRAIDbLevel() != aLoadBalancer.getRAIDbLevel())
0321:                    throw new SQLException(Translate.get(
0322:                            "requestmanager.incompatible.raidb.levels",
0323:                            new String[] { "" + aScheduler.getRAIDbLevel(),
0324:                                    "" + aLoadBalancer.getRAIDbLevel() }));
0325:
0326:                // requiredParsingGranularity is the maximum of each component granularity
0327:                setScheduler(aScheduler);
0328:                schedulerParsingranularity = aScheduler.getParsingGranularity();
0329:                requiredParsingGranularity = schedulerParsingranularity;
0330:                setLoadBalancer(aLoadBalancer);
0331:                loadBalancerParsingranularity = aLoadBalancer
0332:                        .getParsingGranularity();
0333:                if (loadBalancerParsingranularity > requiredParsingGranularity)
0334:                    requiredParsingGranularity = loadBalancerParsingranularity;
0335:            }
0336:
0337:            /**
0338:             * Get the parsing from the parsing cache or parse the query.
0339:             *
0340:             * @param request the request to be parsed (request will be parsed after this
0341:             *          call)
0342:             * @throws SQLException if an error occurs during parsing
0343:             */
0344:            public void getParsingFromCacheOrParse(AbstractRequest request)
0345:                    throws SQLException {
0346:                /*
0347:                 * If we need to parse the request, try to get the parsing from the cache.
0348:                 * Note that if we have a cache miss but backgroundParsing has been turned
0349:                 * on, then this call will start a ParsedThread in background.
0350:                 */
0351:                if ((requiredParsingGranularity != ParsingGranularities.NO_PARSING)
0352:                        && (!request.isParsed())) {
0353:                    if (parsingCache == null)
0354:                        request.parse(getDatabaseSchema(),
0355:                                requiredParsingGranularity,
0356:                                isCaseSensitiveParsing);
0357:                    else
0358:                        parsingCache
0359:                                .getParsingFromCacheAndParseIfMissing(request);
0360:                }
0361:            }
0362:
0363:            /**
0364:             * Method initRequestManagerVariables.
0365:             *
0366:             * @param vdb the virtual database
0367:             * @param beginTimeout timeout for begin operation
0368:             * @param commitTimeout timeout for commit operation
0369:             * @param rollbackTimeout timeout for rollback operation
0370:             */
0371:            private void initRequestManagerVariables(VirtualDatabase vdb,
0372:                    long beginTimeout, long commitTimeout, long rollbackTimeout) {
0373:                this .transactionMetaDatas = new Hashtable();
0374:                this .tidSavepoints = new Hashtable();
0375:                this .beginTimeout = beginTimeout;
0376:                this .commitTimeout = commitTimeout;
0377:                this .rollbackTimeout = rollbackTimeout;
0378:                this .vdb = vdb;
0379:                logger = Trace
0380:                        .getLogger("org.continuent.sequoia.controller.RequestManager."
0381:                                + vdb.getDatabaseName());
0382:            }
0383:
0384:            //
0385:            // Request Handling
0386:            //
0387:
0388:            /**
0389:             * Close the given persistent connection.
0390:             *
0391:             * @param login login to use to retrieve the right connection pool
0392:             * @param persistentConnectionId id of the persistent connection to close
0393:             */
0394:            public void closePersistentConnection(String login,
0395:                    long persistentConnectionId) {
0396:                VirtualDatabaseWorkerThread vdbwt = vdb
0397:                        .getVirtualDatabaseWorkerThreadForPersistentConnection(persistentConnectionId);
0398:                if (vdbwt != null)
0399:                    vdbwt.notifyClose(persistentConnectionId);
0400:
0401:                try {
0402:                    scheduler.scheduleClosePersistentConnection();
0403:
0404:                    // No need to wait for task completion (cannot fail), so log right away.
0405:                    if (recoveryLog != null)
0406:                        recoveryLog.logClosePersistentConnection(login,
0407:                                persistentConnectionId);
0408:
0409:                    loadBalancer.closePersistentConnection(login,
0410:                            persistentConnectionId);
0411:                } catch (NoMoreBackendException ignore) {
0412:                    // Removes ugly trace as reported in SEQUOIA-390
0413:                } catch (SQLException e) {
0414:                    logger.warn("Failed to close persistent connection "
0415:                            + persistentConnectionId, e);
0416:                } finally {
0417:                    scheduler
0418:                            .closePersistentConnectionCompleted(persistentConnectionId);
0419:                }
0420:            }
0421:
0422:            /**
0423:             * Returns true if the virtual database has opened the given persistent
0424:             * connection.
0425:             *
0426:             * @param persistentConnectionId id of the persistent connection to check
0427:             * @return true if the connection is open
0428:             */
0429:            public boolean hasPersistentConnection(long persistentConnectionId) {
0430:                return scheduler
0431:                        .hasPersistentConnection(persistentConnectionId);
0432:            }
0433:
0434:            /**
0435:             * Open the given persistent connection.
0436:             *
0437:             * @param login login to use to retrieve the right connection pool
0438:             * @param persistentConnectionId id of the persistent connection to open
0439:             * @param dmsg message in the total order queue if applicable (null otherwise)
0440:             * @throws SQLException An exception can be thrown when it failed to open a
0441:             *           connection
0442:             */
0443:            public void openPersistentConnection(String login,
0444:                    long persistentConnectionId,
0445:                    DistributedOpenPersistentConnection dmsg)
0446:                    throws SQLException {
0447:                boolean success = false;
0448:                long entryId = -1;
0449:                try {
0450:                    scheduler.scheduleOpenPersistentConnection(dmsg);
0451:
0452:                    if (recoveryLog != null)
0453:                        entryId = recoveryLog.logOpenPersistentConnection(
0454:                                login, persistentConnectionId);
0455:
0456:                    loadBalancer.openPersistentConnection(login,
0457:                            persistentConnectionId);
0458:                    success = true;
0459:                } catch (NoMoreBackendException e) {
0460:                    throw e;
0461:                } catch (SQLException e) {
0462:                    logger.warn("Failed to open persistent connection "
0463:                            + persistentConnectionId, e);
0464:                    throw e;
0465:                } finally {
0466:                    if (recoveryLog != null)
0467:                        recoveryLog.logRequestCompletion(entryId, success, 0);
0468:                    scheduler.openPersistentConnectionCompleted(
0469:                            persistentConnectionId, success);
0470:                }
0471:            }
0472:
0473:            private final Object REQUEST_ID_SYNC_OBJECT = new Object();
0474:
0475:            /**
0476:             * Initialize the request id with the given value (usually retrieved from the
0477:             * recovery log).
0478:             *
0479:             * @param requestId new current request identifier
0480:             */
0481:            public void initializeRequestId(long requestId) {
0482:                synchronized (REQUEST_ID_SYNC_OBJECT) {
0483:                    // Use the max operator as a safeguard: IDs may have been delivered but
0484:                    // not logged yet.
0485:                    this .requestId = Math.max(this .requestId + 1, requestId);
0486:                }
0487:            }
0488:
0489:            /**
0490:             * Return the next request identifier (monotically increasing number).
0491:             *
0492:             * @return a request identifier
0493:             */
0494:            public long getNextRequestId() {
0495:                synchronized (REQUEST_ID_SYNC_OBJECT) {
0496:                    return requestId++;
0497:                }
0498:            }
0499:
0500:            /**
0501:             * Perform a read request and return the reply. Call first the scheduler, then
0502:             * the cache (if defined) and finally the load balancer.
0503:             *
0504:             * @param request the request to execute
0505:             * @return a <code>ControllerResultSet</code> value
0506:             * @exception SQLException if an error occurs
0507:             */
0508:            public ControllerResultSet statementExecuteQuery(
0509:                    SelectRequest request) throws SQLException {
0510:                // Sanity check
0511:                TransactionMetaData tm = null;
0512:                if (!request.isAutoCommit()) { // Check that the transaction has been started
0513:                    Long tid = new Long(request.getTransactionId());
0514:                    try {
0515:                        tm = getTransactionMetaData(tid);
0516:                    } catch (SQLException e) {
0517:                        throw new SQLException(Translate.get(
0518:                                "transaction.not.started", tid));
0519:                    }
0520:                }
0521:
0522:                getParsingFromCacheOrParse(request);
0523:
0524:                //
0525:                // SCHEDULER
0526:                //
0527:
0528:                if (logger.isDebugEnabled())
0529:                    logger.debug(Translate.get(
0530:                            "requestmanager.read.request.schedule",
0531:                            new String[] {
0532:                                    String.valueOf(request.getId()),
0533:                                    request.getSqlShortForm(vdb
0534:                                            .getSqlShortFormLength()) }));
0535:
0536:                // Wait for the scheduler to give us the authorization to execute
0537:                scheduler.scheduleReadRequest(request);
0538:
0539:                //
0540:                // CACHE
0541:                //
0542:
0543:                ControllerResultSet result = null;
0544:                try { // Check cache if any
0545:                    if ((resultCache != null) && (!request.isMustBroadcast())) {
0546:                        if (logger.isDebugEnabled())
0547:                            logger
0548:                                    .debug(Translate
0549:                                            .get(
0550:                                                    "requestmanager.read.request.cache.get",
0551:                                                    new String[] {
0552:                                                            String
0553:                                                                    .valueOf(request
0554:                                                                            .getId()),
0555:                                                            request
0556:                                                                    .getSqlShortForm(vdb
0557:                                                                            .getSqlShortFormLength()) }));
0558:
0559:                        AbstractResultCacheEntry qce = resultCache
0560:                                .getFromCache(request, true);
0561:                        if (qce != null) {
0562:                            result = qce.getResult();
0563:                            if (result != null) { // Cache hit !
0564:                                if (vdb.getSQLMonitor() != null)
0565:                                    vdb.getSQLMonitor().logCacheHit(request);
0566:
0567:                                scheduler.readCompleted(request);
0568:                                return result;
0569:                            }
0570:                        }
0571:                    }
0572:
0573:                    //
0574:                    // LOAD BALANCER
0575:                    //
0576:
0577:                    if (logger.isDebugEnabled())
0578:                        logger.debug(Translate.get(
0579:                                "requestmanager.read.request.balance",
0580:                                new String[] {
0581:                                        String.valueOf(request.getId()),
0582:                                        request.getSqlShortForm(vdb
0583:                                                .getSqlShortFormLength()) }));
0584:
0585:                    // At this point, we have a result cache miss.
0586:
0587:                    // Send the request to the load balancer
0588:                    result = loadBalancer.statementExecuteQuery(request,
0589:                            metadataCache);
0590:
0591:                    //
0592:                    // UPDATES & NOTIFICATIONS
0593:                    //
0594:
0595:                    // Update cache
0596:                    if ((resultCache != null)
0597:                            && (request.getCacheAbility() != RequestType.UNCACHEABLE)) {
0598:                        if (logger.isDebugEnabled())
0599:                            logger
0600:                                    .debug(Translate
0601:                                            .get(
0602:                                                    "requestmanager.read.request.cache.update",
0603:                                                    new String[] {
0604:                                                            String
0605:                                                                    .valueOf(request
0606:                                                                            .getId()),
0607:                                                            request
0608:                                                                    .getSqlShortForm(vdb
0609:                                                                            .getSqlShortFormLength()) }));
0610:
0611:                        resultCache.addToCache(request, result);
0612:                        if (tm != null)
0613:                            tm.setAltersQueryResultCache(true);
0614:                    }
0615:                } catch (Exception failed) {
0616:                    if (resultCache != null)
0617:                        resultCache.removeFromPendingQueries(request);
0618:                    if (failed instanceof  NoMoreBackendException)
0619:                        throw (NoMoreBackendException) failed;
0620:                    String msg = Translate.get("requestmanager.request.failed",
0621:                            new String[] {
0622:                                    request.getSqlShortForm(vdb
0623:                                            .getSqlShortFormLength()),
0624:                                    failed.getMessage() });
0625:                    if (failed instanceof  RuntimeException)
0626:                        logger.warn(msg, failed);
0627:                    else
0628:                        logger.warn(msg);
0629:                    if (failed instanceof  SQLException)
0630:                        throw (SQLException) failed;
0631:
0632:                    throw new SQLException(msg);
0633:                } finally {
0634:                    // Notify scheduler of completion
0635:                    scheduler.readCompleted(request);
0636:                }
0637:                return result;
0638:            }
0639:
0640:            /**
0641:             * Perform a write request and return the number of rows affected Call first
0642:             * the scheduler (if defined), then notify the cache (if defined) and finally
0643:             * call the load balancer.
0644:             *
0645:             * @param request the request to execute
0646:             * @return number of rows affected
0647:             * @exception SQLException if an error occurs
0648:             */
0649:            public ExecuteUpdateResult statementExecuteUpdate(
0650:                    AbstractWriteRequest request) throws SQLException {
0651:                boolean hasBeenScheduled = false, schedulerHasBeenNotified = false;
0652:                try {
0653:                    scheduleExecWriteRequest(request);
0654:                    hasBeenScheduled = true;
0655:                    ExecuteUpdateResult execWriteRequestResult = null;
0656:                    try {
0657:                        execWriteRequestResult = loadBalanceStatementExecuteUpdate(request);
0658:                    } catch (AllBackendsFailedException e) {
0659:                        String msg = Translate
0660:                                .get("requestmanager.write.request.failed.unexpected");
0661:                        logger.fatal(msg, e);
0662:                        endUserLogger.fatal(msg);
0663:                        throw new RuntimeException(msg, e);
0664:                    }
0665:                    updateAndNotifyExecWriteRequest(request,
0666:                            execWriteRequestResult.getUpdateCount());
0667:                    schedulerHasBeenNotified = true;
0668:                    return execWriteRequestResult;
0669:                } finally {
0670:                    if (hasBeenScheduled && !schedulerHasBeenNotified)
0671:                        scheduler.writeCompleted(request);
0672:                }
0673:            }
0674:
0675:            /**
0676:             * Perform a write request and return the auto generated keys. Call first the
0677:             * scheduler (if defined), then notify the cache (if defined) and finally call
0678:             * the load balancer.
0679:             *
0680:             * @param request the request to execute
0681:             * @return auto generated keys.
0682:             * @exception SQLException if an error occurs
0683:             */
0684:            public GeneratedKeysResult statementExecuteUpdateWithKeys(
0685:                    AbstractWriteRequest request) throws SQLException {
0686:                boolean hasBeenScheduled = false, schedulerHasBeenNotified = false;
0687:                try {
0688:                    scheduleExecWriteRequest(request);
0689:                    hasBeenScheduled = true;
0690:                    GeneratedKeysResult execWriteRequestWithKeysResult = null;
0691:                    try {
0692:                        execWriteRequestWithKeysResult = loadBalanceStatementExecuteUpdateWithKeys(request);
0693:                    } catch (AllBackendsFailedException e) {
0694:                        String msg = Translate
0695:                                .get("requestmanager.write.request.keys.failed.unexpected");
0696:                        logger.fatal(msg, e);
0697:                        endUserLogger.fatal(msg);
0698:                        throw new RuntimeException(msg, e);
0699:                    }
0700:                    updateAndNotifyExecWriteRequest(request,
0701:                            execWriteRequestWithKeysResult.getUpdateCount());
0702:                    schedulerHasBeenNotified = true;
0703:                    return execWriteRequestWithKeysResult;
0704:                } finally {
0705:                    if (hasBeenScheduled && !schedulerHasBeenNotified)
0706:                        scheduler.writeCompleted(request);
0707:                }
0708:            }
0709:
0710:            /**
0711:             * Execute a call to CallableStatement.execute() and returns a suite of
0712:             * updateCount and/or ResultSets.
0713:             *
0714:             * @param request the stored procedure to execute
0715:             * @return an <code>ExecuteResult</code> object
0716:             * @throws AllBackendsFailedException if all backends failed to execute the
0717:             *           stored procedure
0718:             * @exception SQLException if an error occurs
0719:             */
0720:            public ExecuteResult statementExecute(AbstractRequest request)
0721:                    throws AllBackendsFailedException, SQLException {
0722:                // Create a fake stored procedure
0723:                StoredProcedure proc = new StoredProcedure(request
0724:                        .getSqlOrTemplate(), request.getEscapeProcessing(),
0725:                        request.getTimeout(), request.getLineSeparator());
0726:                proc.setIsAutoCommit(request.isAutoCommit());
0727:                proc.setTransactionId(request.getTransactionId());
0728:                proc.setTransactionIsolation(request.getTransactionIsolation());
0729:                proc.setId(request.getId());
0730:                proc.setLogin(request.getLogin());
0731:                proc.setPreparedStatementParameters(request
0732:                        .getPreparedStatementParameters());
0733:                proc.setTimeout(request.getTimeout());
0734:                proc.setMaxRows(request.getMaxRows());
0735:                proc.setPersistentConnection(request.isPersistentConnection());
0736:                proc.setPersistentConnectionId(request
0737:                        .getPersistentConnectionId());
0738:
0739:                boolean hasBeenScheduled = false;
0740:                try {
0741:                    ExecuteResult result;
0742:
0743:                    // Schedule as a stored procedure
0744:                    scheduleStoredProcedure(proc);
0745:                    hasBeenScheduled = true;
0746:
0747:                    if (logger.isDebugEnabled())
0748:                        logger.debug(Translate.get(
0749:                                "requestmanager.write.stored.procedure",
0750:                                new String[] {
0751:                                        String.valueOf(request.getId()),
0752:                                        request.getSqlShortForm(vdb
0753:                                                .getSqlShortFormLength()) }));
0754:
0755:                    result = loadBalanceStatementExecute(proc);
0756:
0757:                    updateRecoveryLogFlushCacheAndRefreshSchema(proc);
0758:
0759:                    // Notify scheduler of completion
0760:                    scheduler.storedProcedureCompleted(proc);
0761:
0762:                    return result;
0763:                } catch (AllBackendsFailedException e) {
0764:                    throw e;
0765:                } finally {
0766:                    if (hasBeenScheduled)
0767:                        scheduler.storedProcedureCompleted(proc);
0768:                }
0769:            }
0770:
0771:            /**
0772:             * Schedule a request for execution.
0773:             *
0774:             * @param request the request to execute
0775:             * @throws SQLException if an error occurs
0776:             */
0777:            public void scheduleExecWriteRequest(AbstractWriteRequest request)
0778:                    throws SQLException {
0779:                // Sanity check
0780:                if (!request.isAutoCommit()) { // Check that the transaction has been
0781:                    // started
0782:                    long tid = request.getTransactionId();
0783:                    if (!transactionMetaDatas.containsKey(new Long(tid)))
0784:                        throw new SQLException(Translate.get(
0785:                                "transaction.not.started", tid));
0786:                }
0787:
0788:                getParsingFromCacheOrParse(request);
0789:
0790:                //
0791:                // SCHEDULER
0792:                //
0793:
0794:                if (logger.isDebugEnabled())
0795:                    logger.debug(Translate.get(
0796:                            "requestmanager.write.request.schedule",
0797:                            new String[] {
0798:                                    String.valueOf(request.getId()),
0799:                                    request.getSqlShortForm(vdb
0800:                                            .getSqlShortFormLength()) }));
0801:
0802:                // Wait for the scheduler to give us the authorization to execute
0803:                try {
0804:                    scheduler.scheduleWriteRequest(request);
0805:                } catch (RollbackException e) { // Something bad happened and we need to rollback this transaction
0806:                    rollback(request.getTransactionId(), true);
0807:                    throw new SQLException(e.getMessage());
0808:                }
0809:            }
0810:
0811:            /**
0812:             * Send the given query to the load balancer. If the request fails, the
0813:             * scheduler is properly notified.
0814:             *
0815:             * @param request the request to execute
0816:             * @return update count and auto-generated keys
0817:             * @throws AllBackendsFailedException if all backends failed to execute the
0818:             *           query
0819:             * @throws NoMoreBackendException if no backends are left to execute the
0820:             *           request
0821:             * @throws SQLException if an error occurs
0822:             */
0823:            public GeneratedKeysResult loadBalanceStatementExecuteUpdateWithKeys(
0824:                    AbstractWriteRequest request)
0825:                    throws AllBackendsFailedException, NoMoreBackendException,
0826:                    SQLException {
0827:                if (logger.isDebugEnabled())
0828:                    logger.debug(Translate.get(
0829:                            "requestmanager.write.request.balance",
0830:                            new String[] {
0831:                                    String.valueOf(request.getId()),
0832:                                    String.valueOf(request.getTransactionId()),
0833:                                    request.getSqlShortForm(vdb
0834:                                            .getSqlShortFormLength()) }));
0835:
0836:                try { // Send the request to the load balancer
0837:                    return loadBalancer.statementExecuteUpdateWithKeys(request,
0838:                            metadataCache);
0839:                } catch (Exception failed) {
0840:                    if (!vdb.isDistributed()) { // Notify log of failure
0841:                        if (recoveryLog != null)
0842:                            recoveryLog.logRequestCompletion(
0843:                                    request.getLogId(), false, request
0844:                                            .getExecTimeInMs());
0845:                    }
0846:
0847:                    String msg = Translate.get("requestmanager.request.failed",
0848:                            new String[] {
0849:                                    request.getSqlShortForm(vdb
0850:                                            .getSqlShortFormLength()),
0851:                                    failed.getMessage() });
0852:                    if (failed instanceof  RuntimeException)
0853:                        logger.warn(msg, failed);
0854:
0855:                    if (failed instanceof  AllBackendsFailedException)
0856:                        throw (AllBackendsFailedException) failed;
0857:                    else if (failed instanceof  SQLException)
0858:                        throw (SQLException) failed;
0859:                    else if (failed instanceof  NoMoreBackendException)
0860:                        throw (NoMoreBackendException) failed;
0861:                    else
0862:                        throw new SQLException(msg);
0863:                }
0864:            }
0865:
0866:            /**
0867:             * Send the given query to the load balancer. If the request fails, the
0868:             * scheduler is properly notified.
0869:             *
0870:             * @param request the request to execute
0871:             * @throws AllBackendsFailedException if all backends failed to execute the
0872:             *           query
0873:             * @exception NoMoreBackendException if no backends are left to execute the
0874:             *              request
0875:             * @throws SQLException if an error occurs
0876:             * @return number of modified lines
0877:             */
0878:            public ExecuteUpdateResult loadBalanceStatementExecuteUpdate(
0879:                    AbstractWriteRequest request)
0880:                    throws AllBackendsFailedException, NoMoreBackendException,
0881:                    SQLException {
0882:                if (logger.isDebugEnabled())
0883:                    logger.debug(Translate.get(
0884:                            "requestmanager.write.request.balance",
0885:                            new String[] {
0886:                                    String.valueOf(request.getId()),
0887:                                    String.valueOf(request.getTransactionId()),
0888:                                    request.getSqlShortForm(vdb
0889:                                            .getSqlShortFormLength()) }));
0890:
0891:                try { // Send the request to the load balancer
0892:                    if (request.isUpdate() && (resultCache != null)) { // Try the optimization if we try to update values that are already
0893:                        // up-to-date. Warnings will be lost anyway so forget it
0894:                        if (!resultCache
0895:                                .isUpdateNecessary((UpdateRequest) request))
0896:                            return new ExecuteUpdateResult(0);
0897:                    }
0898:                    return loadBalancer.statementExecuteUpdate(request);
0899:                } catch (Exception failed) {
0900:                    if (!vdb.isDistributed()) { // Notify log of failure
0901:                        if (recoveryLog != null)
0902:                            recoveryLog.logRequestCompletion(
0903:                                    request.getLogId(), false, request
0904:                                            .getExecTimeInMs());
0905:                    }
0906:
0907:                    String msg = Translate.get("requestmanager.request.failed",
0908:                            new String[] {
0909:                                    request.getSqlShortForm(vdb
0910:                                            .getSqlShortFormLength()),
0911:                                    failed.getMessage() });
0912:
0913:                    // Error logging
0914:                    if (failed instanceof  RuntimeException)
0915:                        logger.warn(msg, failed);
0916:
0917:                    // Rethrow exception
0918:                    if (failed instanceof  AllBackendsFailedException)
0919:                        throw (AllBackendsFailedException) failed;
0920:                    else if (failed instanceof  SQLException)
0921:                        throw (SQLException) failed;
0922:                    else if (failed instanceof  NoMoreBackendException)
0923:                        throw (NoMoreBackendException) failed;
0924:                    else
0925:                        throw new SQLException(msg);
0926:                }
0927:            }
0928:
0929:            /**
0930:             * Execute a request using Statement.execute() on the load balancer. Note that
0931:             * we flush the cache before calling the load balancer.
0932:             *
0933:             * @param request the request to execute.
0934:             * @return an <code>ExecuteResult</code> object
0935:             * @throws SQLException if an error occurs
0936:             * @throws AllBackendsFailedException if all backends failed to execute the
0937:             *           stored procedure
0938:             */
0939:            public ExecuteResult loadBalanceStatementExecute(
0940:                    AbstractRequest request) throws AllBackendsFailedException,
0941:                    SQLException {
0942:                ExecuteResult result;
0943:                //
0944:                // CACHE
0945:                //
0946:
0947:                // Flush cache (if any) before
0948:                if (resultCache != null) {
0949:                    resultCache.flushCache();
0950:                }
0951:
0952:                //
0953:                // LOAD BALANCER
0954:                //
0955:
0956:                try { // Send the request to the load balancer
0957:                    result = loadBalancer.statementExecute(request,
0958:                            metadataCache);
0959:                    return result;
0960:                } catch (Exception failed) {
0961:                    if (!vdb.isDistributed()) { // Notify log of failure
0962:                        if (recoveryLog != null)
0963:                            recoveryLog.logRequestCompletion(
0964:                                    request.getLogId(), false, request
0965:                                            .getExecTimeInMs());
0966:                    }
0967:
0968:                    String msg = Translate.get("requestmanager.request.failed",
0969:                            new String[] {
0970:                                    request.getSqlShortForm(vdb
0971:                                            .getSqlShortFormLength()),
0972:                                    failed.getMessage() });
0973:                    if (failed instanceof  RuntimeException)
0974:                        logger.warn(msg, failed);
0975:
0976:                    if (failed instanceof  AllBackendsFailedException)
0977:                        throw (AllBackendsFailedException) failed;
0978:                    else if (failed instanceof  SQLException)
0979:                        throw (SQLException) failed;
0980:                    else if (failed instanceof  NoMoreBackendException)
0981:                        throw (NoMoreBackendException) failed;
0982:                    else
0983:                        throw (SQLException) new SQLException(msg)
0984:                                .initCause(failed);
0985:                }
0986:            }
0987:
0988:            /**
0989:             * Update the recovery log, cache, update the database schema if needed and
0990:             * finally notify the scheduler. Note that if an error occurs, the scheduler
0991:             * is always notified.
0992:             *
0993:             * @param request the request to execute
0994:             * @param updateCount the update count if query was executed with
0995:             *          executeUpdate(), -1 otherwise
0996:             * @throws SQLException if an error occurs
0997:             */
0998:            public void updateAndNotifyExecWriteRequest(
0999:                    AbstractWriteRequest request, int updateCount)
1000:                    throws SQLException {
1001:                try {
1002:                    TransactionMetaData tm = null;
1003:                    if (!request.isAutoCommit()) {
1004:                        /*
1005:                         * This is a write transaction, update the transactional context so that
1006:                         * commit/rollback for that transaction will be logged.
1007:                         */
1008:                        tm = getTransactionMetaData(new Long(request
1009:                                .getTransactionId()));
1010:                        tm.setReadOnly(false);
1011:                    }
1012:
1013:                    // Update the recovery log (if there is one and we are not using SingleDB)
1014:                    if ((recoveryLog != null)
1015:                            && (loadBalancer.getRAIDbLevel() != RAIDbLevels.SingleDB))
1016:                        recoveryLog.logRequestExecuteUpdateCompletion(request
1017:                                .getLogId(), true, updateCount, request
1018:                                .getExecTimeInMs());
1019:
1020:                    if (request.altersSomething()) {
1021:                        // Start to update the query result cache first if needed (must be done
1022:                        // before altering the schema)
1023:                        if (request.altersQueryResultCache()
1024:                                && (resultCache != null)) {
1025:                            if (logger.isDebugEnabled())
1026:                                logger
1027:                                        .debug(Translate
1028:                                                .get(
1029:                                                        "requestmanager.write.request.cache.update",
1030:                                                        new String[] {
1031:                                                                String
1032:                                                                        .valueOf(request
1033:                                                                                .getId()),
1034:                                                                request
1035:                                                                        .getSqlShortForm(vdb
1036:                                                                                .getSqlShortFormLength()) }));
1037:
1038:                            if (tm != null)
1039:                                tm.setAltersQueryResultCache(true);
1040:                            resultCache.writeNotify(request);
1041:                        }
1042:
1043:                        // Update the schema if needed
1044:                        if (request.altersDatabaseSchema()
1045:                                && (requiredParsingGranularity != ParsingGranularities.NO_PARSING)) {
1046:                            if (tm != null)
1047:                                tm.setAltersDatabaseSchema(true);
1048:
1049:                            DatabaseSchema currentSchema = getDatabaseSchema();
1050:                            if (currentSchema == null) {
1051:                                // the schema can be set to null during force disable backend.
1052:                                // Don't try to update it (see UNICLUSTER-246), just flag it as
1053:                                // to be refreshed
1054:                                setSchemaIsDirty(true);
1055:                            } else {
1056:                                if (request.isCreate()) { // Add the table to the schema
1057:                                    CreateRequest createRequest = (CreateRequest) request;
1058:                                    if (createRequest.getDatabaseTable() != null) {
1059:                                        currentSchema
1060:                                                .addTable(new DatabaseTable(
1061:                                                        ((CreateRequest) request)
1062:                                                                .getDatabaseTable()));
1063:                                        if (logger.isDebugEnabled())
1064:                                            logger
1065:                                                    .debug(Translate
1066:                                                            .get(
1067:                                                                    "requestmanager.schema.add.table",
1068:                                                                    request
1069:                                                                            .getTableName()));
1070:                                        // TODO : right now, we ask a complete schema refresh
1071:                                        // Optimization, we should refresh only this table
1072:                                        setSchemaIsDirty(true);
1073:                                    } else
1074:                                        // Some other create statement that modifies the schema, force
1075:                                        // refresh
1076:                                        setSchemaIsDirty(true);
1077:                                } else if (request.isDrop()) { // Delete the table from the schema
1078:                                    SortedSet tables = ((DropRequest) request)
1079:                                            .getTablesToDrop();
1080:                                    if (tables != null) { // Tables to drop !
1081:                                        for (Iterator iter = tables.iterator(); iter
1082:                                                .hasNext();) {
1083:                                            String tableName = (String) iter
1084:                                                    .next();
1085:                                            DatabaseTable table = currentSchema
1086:                                                    .getTable(tableName);
1087:                                            if (table == null) {
1088:                                                // Table not found, force refresh
1089:                                                setSchemaIsDirty(true);
1090:                                            } else { // Table found, let's try to update the schema
1091:                                                if (currentSchema
1092:                                                        .removeTable(table)) {
1093:                                                    if (logger.isDebugEnabled())
1094:                                                        logger
1095:                                                                .debug(Translate
1096:                                                                        .get(
1097:                                                                                "requestmanager.schema.remove.table",
1098:                                                                                table
1099:                                                                                        .getName()));
1100:
1101:                                                    // Remove table from depending tables
1102:                                                    if (logger.isDebugEnabled())
1103:                                                        logger
1104:                                                                .debug("Removing table '"
1105:                                                                        + table
1106:                                                                                .getName()
1107:                                                                        + "' from dependending tables in request manager database schema");
1108:                                                    currentSchema
1109:                                                            .removeTableFromDependingTables(table);
1110:                                                } else {
1111:                                                    // Table not found, force refresh
1112:                                                    setSchemaIsDirty(true);
1113:                                                }
1114:                                            }
1115:                                        }
1116:                                    }
1117:                                } else if (request.isAlter()
1118:                                        && (requiredParsingGranularity > ParsingGranularities.TABLE)) { // Add or drop the column from the table
1119:                                    AlterRequest req = (AlterRequest) request;
1120:                                    DatabaseTable alteredTable = currentSchema
1121:                                            .getTable(req.getTableName());
1122:                                    if ((alteredTable != null)
1123:                                            && (req.getColumn() != null)) {
1124:                                        if (req.isDrop())
1125:                                            alteredTable.removeColumn(req
1126:                                                    .getColumn().getName());
1127:                                        else if (req.isAdd())
1128:                                            alteredTable.addColumn(req
1129:                                                    .getColumn());
1130:                                        else
1131:                                            // Unsupported, force refresh
1132:                                            setSchemaIsDirty(true);
1133:                                    } else
1134:                                        // Table not found, force refresh
1135:                                        setSchemaIsDirty(true);
1136:                                } else
1137:                                    // Unsupported, force refresh
1138:                                    setSchemaIsDirty(true);
1139:                            }
1140:                        }
1141:                        if (request.altersMetadataCache()
1142:                                && (metadataCache != null)) {
1143:                            if (tm != null)
1144:                                tm.setAltersMetadataCache(true);
1145:
1146:                            metadataCache.flushCache();
1147:                        }
1148:
1149:                        // The following modifications are not specifically handled since there
1150:                        // is no cache for such structure yet
1151:
1152:                        // if (request.altersAggregateList())
1153:                        // ;
1154:                        // if (request.altersDatabaseCatalog())
1155:                        // ;
1156:                        // if (request.altersStoredProcedureList())
1157:                        // ;
1158:                        // if (request.altersUserDefinedTypes())
1159:                        // ;
1160:                        // if (request.altersUsers())
1161:                        // ;
1162:
1163:                    }
1164:                } catch (Exception failed) {
1165:                    logger.fatal("---");
1166:                    logger.fatal("SHIT", failed);
1167:                    logger.fatal("---");
1168:                    String msg = Translate.get("requestmanager.request.failed",
1169:                            new String[] {
1170:                                    request.getSqlShortForm(vdb
1171:                                            .getSqlShortFormLength()),
1172:                                    failed.getMessage() });
1173:                    if (failed instanceof  RuntimeException)
1174:                        logger.warn(msg, failed);
1175:                    else
1176:                        logger.warn(msg);
1177:                    throw new SQLException(msg);
1178:                } finally {
1179:                    // Notify scheduler
1180:                    scheduler.writeCompleted(request);
1181:                }
1182:            }
1183:
1184:            /**
1185:             * Call a stored procedure that returns a ResultSet.
1186:             *
1187:             * @param proc the stored procedure call
1188:             * @return a <code>ControllerResultSet</code> value
1189:             * @throws AllBackendsFailedException if all backends failed to execute the
1190:             *           stored procedure
1191:             * @exception SQLException if an error occurs
1192:             */
1193:            public ControllerResultSet callableStatementExecuteQuery(
1194:                    StoredProcedure proc) throws AllBackendsFailedException,
1195:                    SQLException {
1196:                ControllerResultSet result = null;
1197:                boolean hasBeenScheduled = false;
1198:                boolean success = false;
1199:                try {
1200:                    scheduleStoredProcedure(proc);
1201:                    hasBeenScheduled = true;
1202:
1203:                    if (logger.isDebugEnabled())
1204:                        logger.debug(Translate.get(
1205:                                "requestmanager.read.stored.procedure",
1206:                                new String[] {
1207:                                        String.valueOf(proc.getId()),
1208:                                        proc.getSqlShortForm(vdb
1209:                                                .getSqlShortFormLength()) }));
1210:
1211:                    result = loadBalanceCallableStatementExecuteQuery(proc);
1212:
1213:                    updateRecoveryLogFlushCacheAndRefreshSchema(proc);
1214:
1215:                    success = true;
1216:                    return result;
1217:                } catch (AllBackendsFailedException e) {
1218:                    throw e;
1219:                } catch (NoMoreBackendException e) {
1220:                    throw e;
1221:                } catch (Exception failed) {
1222:                    String msg = Translate.get(
1223:                            "requestmanager.stored.procedure.failed",
1224:                            new String[] {
1225:                                    proc.getSqlShortForm(vdb
1226:                                            .getSqlShortFormLength()),
1227:                                    failed.getMessage() });
1228:                    logger.warn(msg);
1229:                    if (failed instanceof  SQLException) {
1230:                        throw (SQLException) failed;
1231:                    }
1232:                    throw new SQLException(msg);
1233:                } finally {
1234:                    if (hasBeenScheduled)
1235:                        scheduler.storedProcedureCompleted(proc);
1236:                    if (!vdb.isDistributed() && !success) { // Notify log of failure
1237:                        if (recoveryLog != null)
1238:                            recoveryLog.logRequestCompletion(proc.getLogId(),
1239:                                    false, proc.getExecTimeInMs());
1240:                    }
1241:                }
1242:            }
1243:
1244:            /**
1245:             * Call a stored procedure that performs an update.
1246:             *
1247:             * @param proc the stored procedure call
1248:             * @return number of rows affected
1249:             * @throws AllBackendsFailedException if all backends failed to execute the
1250:             *           stored procedure
1251:             * @exception SQLException if an error occurs
1252:             */
1253:            public ExecuteUpdateResult callableStatementExecuteUpdate(
1254:                    StoredProcedure proc) throws AllBackendsFailedException,
1255:                    SQLException {
1256:                ExecuteUpdateResult result;
1257:                boolean hasBeenScheduled = false;
1258:                boolean success = false;
1259:                try {
1260:                    // Wait for the scheduler to give us the authorization to execute
1261:                    scheduleStoredProcedure(proc);
1262:                    hasBeenScheduled = true;
1263:
1264:                    if (logger.isDebugEnabled())
1265:                        logger.debug(Translate.get(
1266:                                "requestmanager.write.stored.procedure",
1267:                                new String[] {
1268:                                        String.valueOf(proc.getId()),
1269:                                        proc.getSqlShortForm(vdb
1270:                                                .getSqlShortFormLength()) }));
1271:
1272:                    result = loadBalanceCallableStatementExecuteUpdate(proc);
1273:
1274:                    updateRecoveryLogFlushCacheAndRefreshSchema(proc);
1275:
1276:                    // Notify scheduler of completion
1277:                    scheduler.storedProcedureCompleted(proc);
1278:
1279:                    success = true;
1280:                    return result;
1281:                } catch (AllBackendsFailedException e) {
1282:                    throw e;
1283:                } catch (Exception failed) {
1284:                    String msg = Translate.get(
1285:                            "requestmanager.stored.procedure.failed",
1286:                            new String[] {
1287:                                    proc.getSqlShortForm(vdb
1288:                                            .getSqlShortFormLength()),
1289:                                    failed.getMessage() });
1290:                    logger.warn(msg);
1291:                    if (failed instanceof  SQLException) {
1292:                        throw (SQLException) failed;
1293:                    }
1294:                    throw new SQLException(msg);
1295:                } finally {
1296:                    if (hasBeenScheduled)
1297:                        scheduler.storedProcedureCompleted(proc);
1298:                    if (!vdb.isDistributed() && !success) { // Notify log of failure
1299:                        if (recoveryLog != null)
1300:                            recoveryLog.logRequestCompletion(proc.getLogId(),
1301:                                    false, proc.getExecTimeInMs());
1302:                    }
1303:                }
1304:            }
1305:
1306:            /**
1307:             * Execute a call to CallableStatement.execute() and returns a suite of
1308:             * updateCount and/or ResultSets.
1309:             *
1310:             * @param proc the stored procedure to execute
1311:             * @return an <code>ExecuteResult</code> object
1312:             * @throws AllBackendsFailedException if all backends failed to execute the
1313:             *           stored procedure
1314:             * @exception SQLException if an error occurs
1315:             */
1316:            public ExecuteResult callableStatementExecute(StoredProcedure proc)
1317:                    throws AllBackendsFailedException, SQLException {
1318:                ExecuteResult result;
1319:                boolean hasBeenScheduled = false;
1320:                boolean success = false;
1321:                try {
1322:                    scheduleStoredProcedure(proc);
1323:                    hasBeenScheduled = true;
1324:
1325:                    if (logger.isDebugEnabled())
1326:                        logger.debug(Translate.get(
1327:                                "requestmanager.write.stored.procedure",
1328:                                new String[] {
1329:                                        String.valueOf(proc.getId()),
1330:                                        proc.getSqlShortForm(vdb
1331:                                                .getSqlShortFormLength()) }));
1332:
1333:                    result = loadBalanceCallableStatementExecute(proc);
1334:
1335:                    updateRecoveryLogFlushCacheAndRefreshSchema(proc);
1336:
1337:                    success = true;
1338:                    return result;
1339:                } catch (AllBackendsFailedException e) {
1340:                    throw e;
1341:                } catch (NoMoreBackendException e) {
1342:                    throw e;
1343:                } catch (Exception failed) {
1344:                    String msg = Translate.get(
1345:                            "requestmanager.stored.procedure.failed",
1346:                            new String[] {
1347:                                    proc.getSqlShortForm(vdb
1348:                                            .getSqlShortFormLength()),
1349:                                    failed.getMessage() });
1350:                    logger.warn(msg);
1351:                    if (failed instanceof  SQLException) {
1352:                        throw (SQLException) failed;
1353:                    }
1354:                    throw new SQLException(msg);
1355:                } finally {
1356:                    if (hasBeenScheduled)
1357:                        scheduler.storedProcedureCompleted(proc);
1358:                    if (!vdb.isDistributed() && !success) { // Notify log of failure
1359:                        if (recoveryLog != null)
1360:                            recoveryLog.logRequestCompletion(proc.getLogId(),
1361:                                    false, proc.getExecTimeInMs());
1362:                    }
1363:                }
1364:            }
1365:
1366:            /**
1367:             * This method does some sanity check on the given stored procedure and then
1368:             * tries to schedule it. Note that it is more likely that on a stored
1369:             * procedure the scheduler will lock in write the entire database as it does
1370:             * not know which tables are accessed by the procedure.
1371:             *
1372:             * @param proc the stored procedure to schedule
1373:             * @throws SQLException if an error occurs
1374:             */
1375:            public void scheduleStoredProcedure(StoredProcedure proc)
1376:                    throws SQLException {
1377:                // Sanity check
1378:                if (!proc.isAutoCommit()) { // Check that the transaction has been started
1379:                    long tid = proc.getTransactionId();
1380:                    if (!transactionMetaDatas.containsKey(new Long(tid)))
1381:                        throw new SQLException(Translate.get(
1382:                                "transaction.not.started", tid));
1383:                }
1384:
1385:                getParsingFromCacheOrParse(proc);
1386:
1387:                //
1388:                // SCHEDULER
1389:                //
1390:
1391:                // Wait for the scheduler to give us the authorization to execute
1392:                try {
1393:                    scheduler.scheduleStoredProcedure(proc);
1394:                } catch (RollbackException e) { // Something bad happened and we need to rollback this transaction
1395:                    rollback(proc.getTransactionId(), true);
1396:                    throw new SQLException(e.getMessage());
1397:                }
1398:            }
1399:
1400:            /**
1401:             * Execute a read stored procedure on the load balancer. Note that we flush
1402:             * the cache before calling the load balancer.
1403:             *
1404:             * @param proc the stored procedure to call
1405:             * @return the corresponding ControllerResultSet
1406:             * @throws SQLException if an error occurs
1407:             * @throws AllBackendsFailedException if all backends failed to execute the
1408:             *           stored procedure
1409:             */
1410:            public ControllerResultSet loadBalanceCallableStatementExecuteQuery(
1411:                    StoredProcedure proc) throws SQLException,
1412:                    AllBackendsFailedException {
1413:                DatabaseProcedureSemantic semantic = proc.getSemantic();
1414:                ControllerResultSet result;
1415:
1416:                //
1417:                // CACHE
1418:                //
1419:
1420:                // Cache is always flushed unless the user has explicitely set the
1421:                // connection to read-only mode in which case we assume that the
1422:                // users deliberately forces the cache not to be flushed when calling
1423:                // this stored procedure.
1424:                if ((resultCache != null) && (!proc.isReadOnly())) {
1425:                    if ((semantic == null) || (semantic.isWrite()))
1426:                        resultCache.flushCache();
1427:                }
1428:
1429:                //
1430:                // LOAD BALANCER
1431:                //
1432:
1433:                // Send the request to the load balancer
1434:                if (proc.isReadOnly()
1435:                        || ((semantic != null) && (semantic.isReadOnly())))
1436:                    result = loadBalancer
1437:                            .readOnlyCallableStatementExecuteQuery(proc,
1438:                                    metadataCache);
1439:                else {
1440:                    // Disable fetch size with a distributed execution
1441:                    proc.setFetchSize(0);
1442:                    result = loadBalancer.callableStatementExecuteQuery(proc,
1443:                            metadataCache);
1444:                }
1445:                return result;
1446:            }
1447:
1448:            /**
1449:             * Execute a write stored procedure on the load balancer. Note that we flush
1450:             * the cache before calling the load balancer.
1451:             *
1452:             * @param proc the stored procedure to call
1453:             * @return the number of updated rows
1454:             * @throws SQLException if an error occurs
1455:             * @throws AllBackendsFailedException if all backends failed to execute the
1456:             *           stored procedure
1457:             */
1458:            public ExecuteUpdateResult loadBalanceCallableStatementExecuteUpdate(
1459:                    StoredProcedure proc) throws AllBackendsFailedException,
1460:                    SQLException {
1461:                ExecuteUpdateResult result;
1462:                //
1463:                // CACHE
1464:                //
1465:
1466:                // Flush cache (if any) before as we don't properly lock the tables
1467:                if (resultCache != null) {
1468:                    DatabaseProcedureSemantic semantic = proc.getSemantic();
1469:                    if ((semantic == null) || (semantic.isWrite()))
1470:                        resultCache.flushCache();
1471:                }
1472:
1473:                //
1474:                // LOAD BALANCER
1475:                //
1476:
1477:                // Send the request to the load balancer
1478:                result = loadBalancer.callableStatementExecuteUpdate(proc);
1479:                return result;
1480:            }
1481:
1482:            /**
1483:             * Execute a write stored procedure on the load balancer. Note that we flush
1484:             * the cache before calling the load balancer.
1485:             *
1486:             * @param proc the stored procedure to call
1487:             * @return an <code>ExecuteResult</code> object
1488:             * @throws SQLException if an error occurs
1489:             * @throws AllBackendsFailedException if all backends failed to execute the
1490:             *           stored procedure
1491:             */
1492:            public ExecuteResult loadBalanceCallableStatementExecute(
1493:                    StoredProcedure proc) throws AllBackendsFailedException,
1494:                    SQLException {
1495:                DatabaseProcedureSemantic semantic = proc.getSemantic();
1496:                ExecuteResult result;
1497:
1498:                //
1499:                // CACHE
1500:                //
1501:
1502:                // Flush cache (if any) before as we don't properly lock the tables
1503:                if (resultCache != null) {
1504:                    if ((semantic == null) || (semantic.isWrite()))
1505:                        resultCache.flushCache();
1506:                }
1507:
1508:                //
1509:                // LOAD BALANCER
1510:                //
1511:
1512:                // Send the request to the load balancer
1513:                if (proc.isReadOnly()
1514:                        || ((semantic != null) && (semantic.isReadOnly())))
1515:                    result = loadBalancer.readOnlyCallableStatementExecute(
1516:                            proc, metadataCache);
1517:                else
1518:                    result = loadBalancer.callableStatementExecute(proc,
1519:                            metadataCache);
1520:                return result;
1521:            }
1522:
1523:            /**
1524:             * Update the recovery log with successful completion of the query, flush the
1525:             * cache and force schema refresh if needed.
1526:             *
1527:             * @param proc the stored procedure to log
1528:             * @throws SQLException if the transaction context could not be updated
1529:             */
1530:            public void updateRecoveryLogFlushCacheAndRefreshSchema(
1531:                    StoredProcedure proc) throws SQLException {
1532:                // Stored procedures executing on a read-only connection don't update
1533:                // anything
1534:                if (proc.isReadOnly())
1535:                    return;
1536:
1537:                DatabaseProcedureSemantic semantic = proc.getSemantic();
1538:                TransactionMetaData tm = null;
1539:                if (!proc.isAutoCommit())
1540:                    tm = getTransactionMetaData(new Long(proc
1541:                            .getTransactionId()));
1542:
1543:                if ((semantic == null) || (semantic.hasDDLWrite())) {
1544:                    // Schema might have been updated, force refresh
1545:                    if ((semantic == null) && (logger.isDebugEnabled()))
1546:                        logger.debug("No semantic found for stored procedure "
1547:                                + proc.getSqlShortForm(vdb
1548:                                        .getSqlShortFormLength())
1549:                                + ". Forcing a schema refresh.");
1550:                    setSchemaIsDirty(true);
1551:
1552:                    if (tm != null)
1553:                        tm.setAltersDatabaseSchema(true);
1554:                }
1555:
1556:                if ((semantic == null) || (semantic.isWrite())) {
1557:                    // Update the recovery log (if there is one and we are not using SingleDB)
1558:                    if ((recoveryLog != null)
1559:                            && (loadBalancer.getRAIDbLevel() != RAIDbLevels.SingleDB))
1560:                        recoveryLog.logRequestCompletion(proc.getLogId(), true,
1561:                                proc.getExecTimeInMs());
1562:
1563:                    //
1564:                    // Update transaction context if needed to force commit to be logged
1565:                    // (occurs if callableStatement.executeQuery() is called without semantic
1566:                    // information.
1567:                    //
1568:                    if (tm != null) {
1569:                        tm.setReadOnly(false);
1570:                        tm.setAltersQueryResultCache(true);
1571:                    }
1572:
1573:                    //
1574:                    // CACHE
1575:                    //
1576:
1577:                    // Flush cache (if any) after for consistency (we don't know what has been
1578:                    // modified by the stored procedure)
1579:                    if (resultCache != null)
1580:                        resultCache.flushCache();
1581:
1582:                    if (metadataCache != null)
1583:                        metadataCache.flushCache();
1584:                }
1585:            }
1586:
1587:            /**
1588:             * Return a ControllerResultSet containing the PreparedStatement metaData of
1589:             * the given sql template
1590:             *
1591:             * @param request the request containing the sql template
1592:             * @return an empty ControllerResultSet with the metadata
1593:             * @throws SQLException if a database error occurs
1594:             */
1595:            public ControllerResultSet getPreparedStatementGetMetaData(
1596:                    AbstractRequest request) throws SQLException {
1597:                return loadBalancer.getPreparedStatementGetMetaData(request);
1598:            }
1599:
1600:            //
1601:            // Transaction management
1602:            //
1603:
1604:            /**
1605:             * Begin a new transaction and return the corresponding transaction
1606:             * identifier. This method is called from the driver when setAutoCommit(false)
1607:             * is called.
1608:             * <p>
1609:             * Note that the transaction begin is not logged in the recovery log by this
1610:             * method, you will have to call logLazyTransactionBegin.
1611:             *
1612:             * @param login the login used by the connection
1613:             * @param isPersistentConnection true if the transaction is started on a
1614:             *          persistent connection
1615:             * @param persistentConnectionId persistent connection id if the transaction
1616:             *          must be started on a persistent connection
1617:             * @return long a unique transaction identifier
1618:             * @throws SQLException if an error occurs
1619:             * @see #logLazyTransactionBegin(long)
1620:             */
1621:            public long begin(String login, boolean isPersistentConnection,
1622:                    long persistentConnectionId) throws SQLException {
1623:                long tid = scheduler.getNextTransactionId();
1624:                doBegin(login, tid, isPersistentConnection,
1625:                        persistentConnectionId);
1626:                return tid;
1627:            }
1628:
1629:            /**
1630:             * Begins a new transaction for the given <code>login</code> and
1631:             * <code>tid</code>. Informs the loadbalancer and the scheduler about this
1632:             * begun transaction.
1633:             *
1634:             * @param login the login used by the connection
1635:             * @param tid the tid associed with the transaction to begin
1636:             * @param isPersistentConnection true if the transaction is started on a
1637:             *          persistent connection
1638:             * @param persistentConnectionId persistent connection id if the transaction
1639:             *          must be started on a persistent connection
1640:             * @throws SQLException if an error occurs
1641:             */
1642:            public void doBegin(String login, long tid,
1643:                    boolean isPersistentConnection, long persistentConnectionId)
1644:                    throws SQLException {
1645:                try {
1646:                    TransactionMetaData tm = new TransactionMetaData(tid,
1647:                            beginTimeout, login, isPersistentConnection,
1648:                            persistentConnectionId);
1649:                    scheduler.begin(tm, false, null);
1650:
1651:                    if (logger.isDebugEnabled())
1652:                        logger.debug(Translate.get("transaction.begin", String
1653:                                .valueOf(tid)));
1654:
1655:                    try {
1656:                        // Send to load balancer
1657:                        loadBalancer.begin(tm);
1658:                        // the tid is stored *now* before notifying the scheduler of begin
1659:                        // completion to avoid releasing pending write queries before
1660:                        // transaction list is updated
1661:                        transactionMetaDatas.put(new Long(tid), tm);
1662:                    } catch (SQLException e) {
1663:                        throw e;
1664:                    } finally {
1665:                        // Notify scheduler for completion in any case
1666:                        scheduler.beginCompleted(tid);
1667:                    }
1668:                } catch (RuntimeException e) {
1669:                    String msg = Translate
1670:                            .get("fatal.runtime.exception.requestmanager.begin");
1671:                    logger.fatal(msg, e);
1672:                    endUserLogger.fatal(msg);
1673:                    throw new SQLException(e.getMessage());
1674:                }
1675:            }
1676:
1677:            /**
1678:             * Log the begin of a transaction that is started lazily. In fact, we just log
1679:             * the begin when we execute the first write request in a transaction to
1680:             * prevent logging begin/commit for read-only transactions. This also prevents
1681:             * a problem with backends that are disabled with a checkpoint when no request
1682:             * has been played in the transaction but the begin statement has already been
1683:             * logged. In that case, the transaction would not be properly replayed at
1684:             * restore time.
1685:             *
1686:             * @param transactionId the transaction id begin to log
1687:             * @throws SQLException if an error occurs
1688:             */
1689:            public void logLazyTransactionBegin(long transactionId)
1690:                    throws SQLException {
1691:                try {
1692:                    Long tid = new Long(transactionId);
1693:                    TransactionMetaData tm = getTransactionMetaData(tid);
1694:
1695:                    if (logger.isDebugEnabled())
1696:                        logger.debug(Translate.get("transaction.begin.log",
1697:                                String.valueOf(transactionId)));
1698:
1699:                    // Log the begin
1700:                    if (recoveryLog != null)
1701:                        recoveryLog.logBegin(tm);
1702:                } catch (RuntimeException e) {
1703:                    String msg = Translate
1704:                            .get("fatal.runtime.exception.requestmanager.begin.log");
1705:                    logger.fatal(msg, e);
1706:                    endUserLogger.fatal(msg);
1707:                    throw new SQLException(e.getMessage());
1708:                }
1709:            }
1710:
1711:            /**
1712:             * Abort a transaction that has been started but in which no query was
1713:             * executed. As we use lazy transaction begin, there is no need to rollback
1714:             * such transaction but just to cleanup the metadata associated with this not
1715:             * effectively started transaction.
1716:             *
1717:             * @param transactionId id of the transaction to abort
1718:             * @param logAbort true if the abort (in fact rollback) should be logged in
1719:             *          the recovery log
1720:             * @param forceAbort true if the abort will be forced. Actually, abort will do
1721:             *          nothing when a transaction has savepoints (we do not abort the
1722:             *          whole transaction, so that the user can rollback to a previous
1723:             *          savepoint), except when the connection is closed. In this last
1724:             *          case, if the transaction is not aborted, it prevents future
1725:             *          maintenance operations such as shutdowns, enable/disable from
1726:             *          completing, so we have to force this abort operation. It also
1727:             *          applies to the DeadlockDetectionThread and the cleanup of the
1728:             *          VirtualDatabaseWorkerThread.
1729:             * @throws SQLException if an error occurs
1730:             */
1731:            public void abort(long transactionId, boolean logAbort,
1732:                    boolean forceAbort) throws SQLException {
1733:                TransactionMetaData tm;
1734:                try {
1735:                    Long tid = new Long(transactionId);
1736:                    boolean tmIsFake = false;
1737:                    try {
1738:                        tm = getTransactionMetaData(tid);
1739:                        if (!forceAbort && tidSavepoints.get(tid) != null) {
1740:                            if (logger.isDebugEnabled())
1741:                                logger
1742:                                        .debug("Transaction "
1743:                                                + transactionId
1744:                                                + " has savepoints, transaction will not be aborted");
1745:                            return;
1746:                        }
1747:                    } catch (SQLException e1) {
1748:                        logger
1749:                                .warn("No transaction metadata found to abort transaction "
1750:                                        + transactionId
1751:                                        + ". Creating a fake context for abort.");
1752:                        // Note that we ignore the persistent connection id (will be retrieved
1753:                        // by the connection manager)
1754:                        tm = new TransactionMetaData(transactionId, 0,
1755:                                RecoveryLog.UNKNOWN_USER, false, 0);
1756:                        tm.setReadOnly(!logAbort);
1757:                        tmIsFake = true;
1758:                        if (tidSavepoints.get(tid) != null) {
1759:                            if (logger.isDebugEnabled())
1760:                                logger
1761:                                        .debug("Transaction "
1762:                                                + transactionId
1763:                                                + " has savepoints, transaction will not be aborted");
1764:                            return;
1765:                        }
1766:                    }
1767:
1768:                    VirtualDatabaseWorkerThread vdbwt = vdb
1769:                            .getVirtualDatabaseWorkerThreadForTransaction(transactionId);
1770:                    if (vdbwt != null)
1771:                        vdbwt.notifyAbort(transactionId);
1772:
1773:                    if (logger.isDebugEnabled())
1774:                        logger.debug(Translate.get("transaction.aborting",
1775:                                String.valueOf(transactionId)));
1776:
1777:                    // Notify the scheduler to abort which is the same as a rollback
1778:                    // from a scheduler point of view.
1779:
1780:                    boolean abortScheduled = false;
1781:                    try {
1782:                        if (!tmIsFake) {
1783:                            scheduler.rollback(tm, null);
1784:                            abortScheduled = true;
1785:                        }
1786:
1787:                        loadBalancer.abort(tm);
1788:
1789:                        // Update recovery log
1790:                        if (recoveryLog != null)
1791:                            recoveryLog.logRequestCompletion(tm.getLogId(),
1792:                                    true, 0);
1793:
1794:                        // Invalidate the query result cache if this transaction has updated the
1795:                        // cache or altered the schema
1796:                        if ((resultCache != null)
1797:                                && (tm.altersQueryResultCache() || tm
1798:                                        .altersDatabaseSchema()))
1799:                            resultCache.rollback(transactionId);
1800:
1801:                        // Check for schema modifications that need to be rollbacked
1802:                        if (tm.altersDatabaseSchema()) {
1803:                            if (metadataCache != null)
1804:                                metadataCache.flushCache();
1805:                            setSchemaIsDirty(true);
1806:                        }
1807:                    }
1808:
1809:                    // FIXME This whole catch block is ugly. In particular the Rollback task
1810:                    // is re-created. The original task is created and posted on the total
1811:                    // order queue by the call to loadBalancer.abort() above.
1812:                    // The aim is to ensure that we log the rollback when no backends are
1813:                    // enabled even for read only transactions. This is needed since a begin
1814:                    // could have been previously logged.
1815:                    catch (NoMoreBackendException e) {
1816:                        // Log the query in any case for later recovery (if the request really
1817:                        // failed, it will be unloged later)
1818:                        if (getRecoveryLog() != null) {
1819:                            if (logger.isDebugEnabled())
1820:                                logger
1821:                                        .debug(Translate
1822:                                                .get(
1823:                                                        "virtualdatabase.distributed.abort.logging.only",
1824:                                                        transactionId));
1825:
1826:                            // Wait to be sure that we log in the proper order
1827:                            DistributedRollback totalOrderAbort = new DistributedRollback(
1828:                                    tm.getLogin(), transactionId);
1829:                            if (getLoadBalancer().waitForTotalOrder(
1830:                                    totalOrderAbort, false))
1831:                                getLoadBalancer()
1832:                                        .removeObjectFromAndNotifyTotalOrderQueue(
1833:                                                totalOrderAbort);
1834:
1835:                            // Update recovery log
1836:                            this .getRecoveryLog().logRequestCompletion(
1837:                                    tm.getLogId(), false, 0);
1838:                            e.setRecoveryLogId(tm.getLogId());
1839:                            e.setLogin(tm.getLogin());
1840:                        }
1841:                        throw e;
1842:                    }
1843:
1844:                    catch (SQLException e) {
1845:                        /*
1846:                         * Check if the we have to remove the entry from the total order queue
1847:                         * (wait to be sure that we do it in the proper order)
1848:                         */
1849:                        DistributedRollback totalOrderAbort = new DistributedRollback(
1850:                                tm.getLogin(), transactionId);
1851:                        if (loadBalancer.waitForTotalOrder(totalOrderAbort,
1852:                                false))
1853:                            loadBalancer
1854:                                    .removeObjectFromAndNotifyTotalOrderQueue(totalOrderAbort);
1855:
1856:                        // Update recovery log
1857:                        if (recoveryLog != null)
1858:                            recoveryLog.logRequestCompletion(tm.getLogId(),
1859:                                    false, 0);
1860:
1861:                        throw e;
1862:                    } finally {
1863:                        if (abortScheduled) // is not set if tm is fake
1864:                        {
1865:                            // Notify scheduler for completion
1866:                            scheduler.rollbackCompleted(tm, true);
1867:                        }
1868:
1869:                        completeTransaction(tid);
1870:
1871:                        if (logger.isDebugEnabled())
1872:                            logger.debug(Translate.get("transaction.aborted",
1873:                                    String.valueOf(transactionId)));
1874:                    }
1875:                } catch (RuntimeException e) {
1876:                    String msg = Translate
1877:                            .get("fatal.runtime.exception.requestmanager.abort");
1878:                    logger.fatal(msg, e);
1879:                    endUserLogger.fatal(msg);
1880:                    throw new SQLException(e.getMessage());
1881:                }
1882:            }
1883:
1884:            /**
1885:             * Get the TransactionMetaData for the given transaction id.
1886:             *
1887:             * @param tid transaction id
1888:             * @return the TransactionMetaData
1889:             * @throws SQLException if no marker has been found for this transaction
1890:             */
1891:            public TransactionMetaData getTransactionMetaData(Long tid)
1892:                    throws SQLException {
1893:                TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1894:                        .get(tid);
1895:
1896:                if (tm == null)
1897:                    throw new SQLException(Translate
1898:                            .get("transaction.marker.not.found", String
1899:                                    .valueOf(tid)));
1900:
1901:                return tm;
1902:            }
1903:
1904:            /**
1905:             * Complete the transaction by removing it from the transactionMetaDatas.
1906:             *
1907:             * @param tid transaction id
1908:             */
1909:            public void completeTransaction(Long tid) {
1910:                if (loadBalancer.waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL) {
1911:                    transactionMetaDatas.remove(tid);
1912:                    tidSavepoints.remove(tid);
1913:                } else { // Asynchronous execution, wait for the last backend to complete the
1914:                    // transaction
1915:                    TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1916:                            .get(tid);
1917:                    if (tm != null) { // Check that everyone has release its locks
1918:                        if (tm.isLockedByBackends()) {
1919:                            return;
1920:                        }
1921:                        transactionMetaDatas.remove(tid);
1922:                    }
1923:                    tidSavepoints.remove(tid);
1924:                }
1925:            }
1926:
1927:            /**
1928:             * Commit a transaction given its id.
1929:             *
1930:             * @param transactionId the transaction id
1931:             * @param logCommit true if the commit should be logged in the recovery log
1932:             * @param emptyTransaction true if this transaction has not executed any
1933:             *          request
1934:             * @throws SQLException if an error occurs
1935:             */
1936:            public void commit(long transactionId, boolean logCommit,
1937:                    boolean emptyTransaction) throws SQLException {
1938:                try {
1939:                    Long tid = new Long(transactionId);
1940:                    TransactionMetaData tm = getTransactionMetaData(tid);
1941:
1942:                    boolean commitScheduled = false;
1943:                    boolean success = false;
1944:                    try {
1945:                        scheduler.commit(tm, emptyTransaction, null);
1946:                        commitScheduled = true;
1947:                        if (!emptyTransaction) {
1948:                            if (logger.isDebugEnabled())
1949:                                logger.debug(Translate.get(
1950:                                        "transaction.commit", String
1951:                                                .valueOf(tid)));
1952:
1953:                            // Send to load balancer
1954:                            loadBalancer.commit(tm);
1955:
1956:                            // Notify the cache
1957:                            if (resultCache != null)
1958:                                resultCache.commit(tm.getTransactionId());
1959:                        }
1960:                        success = true;
1961:                    } catch (SQLException e) {
1962:                        /*
1963:                         * Check if the we have to remove the entry from the total order queue
1964:                         * (wait to be sure that we do it in the proper order)
1965:                         */
1966:                        DistributedCommit totalOrderCommit = new DistributedCommit(
1967:                                tm.getLogin(), transactionId);
1968:                        if (loadBalancer.waitForTotalOrder(totalOrderCommit,
1969:                                false))
1970:                            loadBalancer
1971:                                    .removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1972:
1973:                        throw e;
1974:                    } catch (AllBackendsFailedException e) {
1975:                        String msg = "All backends failed to commit transaction "
1976:                                + transactionId + " (" + e + ")";
1977:                        logger.error(msg);
1978:                        throw new SQLException(msg);
1979:                    } finally {
1980:                        // Update the recovery log
1981:                        // The recovery log will take care of read-only transactions to log
1982:                        // only what is required.
1983:                        if (recoveryLog != null && tm.getLogId() != 0)
1984:                            recoveryLog.logRequestCompletion(tm.getLogId(),
1985:                                    success, 0);
1986:                        if (commitScheduled) {
1987:                            // Notify scheduler for completion
1988:                            scheduler.commitCompleted(tm, success);
1989:                        }
1990:                        if (success) {
1991:                            completeTransaction(tid);
1992:                        }
1993:                    }
1994:                } catch (RuntimeException e) {
1995:                    String msg = Translate
1996:                            .get("fatal.runtime.exception.requestmanager.commit");
1997:                    logger.fatal(msg, e);
1998:                    endUserLogger.fatal(msg);
1999:                    throw new SQLException(e.getMessage());
2000:                }
2001:            }
2002:
2003:            /**
2004:             * Rollback a transaction given its id.
2005:             *
2006:             * @param transactionId the transaction id
2007:             * @param logRollback true if the rollback should be logged in the recovery
2008:             *          log
2009:             * @throws SQLException if an error occurs
2010:             */
2011:            public void rollback(long transactionId, boolean logRollback)
2012:                    throws SQLException {
2013:                try {
2014:                    Long tid = new Long(transactionId);
2015:                    TransactionMetaData tm = getTransactionMetaData(tid);
2016:
2017:                    // Wait for the scheduler to give us the authorization to execute
2018:
2019:                    boolean rollbackScheduled = false;
2020:                    boolean success = false;
2021:                    try {
2022:                        scheduler.rollback(tm, null);
2023:                        rollbackScheduled = true;
2024:
2025:                        if (logger.isDebugEnabled())
2026:                            logger.debug(Translate.get("transaction.rollback",
2027:                                    String.valueOf(transactionId)));
2028:
2029:                        // Send to load balancer
2030:                        loadBalancer.rollback(tm);
2031:
2032:                        // Invalidate the query result cache if this transaction has updated the
2033:                        // cache or altered the schema
2034:                        if ((resultCache != null)
2035:                                && (tm.altersQueryResultCache() || tm
2036:                                        .altersDatabaseSchema()))
2037:                            resultCache.rollback(transactionId);
2038:
2039:                        // Check for schema modifications that need to be rollbacked
2040:                        if (tm.altersDatabaseSchema()) {
2041:                            if (metadataCache != null)
2042:                                metadataCache.flushCache();
2043:                            setSchemaIsDirty(true);
2044:                        }
2045:
2046:                        success = true;
2047:                    } catch (SQLException e) {
2048:                        /*
2049:                         * Check if the we have to remove the entry from the total order queue
2050:                         * (wait to be sure that we do it in the proper order)
2051:                         */
2052:                        DistributedRollback totalOrderRollback = new DistributedRollback(
2053:                                tm.getLogin(), transactionId);
2054:                        if (loadBalancer.waitForTotalOrder(totalOrderRollback,
2055:                                false)) {
2056:                            loadBalancer
2057:                                    .removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
2058:                        } else if (recoveryLog != null) { // Force rollback logging even in the case of failure. The recovery
2059:                            // log will take care of empty or read-only transactions that do not
2060:                            // need to log the rollback.
2061:                            if (!recoveryLog.findRollbackForTransaction(tm
2062:                                    .getTransactionId()))
2063:                                recoveryLog.logRollback(tm);
2064:                            if (!rollbackScheduled)
2065:                                recoveryLog.logRequestCompletion(tm.getLogId(),
2066:                                        true, 0);
2067:                        }
2068:
2069:                        throw e;
2070:                    } catch (AllBackendsFailedException e) {
2071:                        String msg = Translate.get(
2072:                                "requestmanager.rollback.failed.all",
2073:                                new String[] { String.valueOf(transactionId),
2074:                                        e.getMessage() });
2075:                        logger.error(msg);
2076:                        throw new SQLException(msg);
2077:                    } finally {
2078:                        if (rollbackScheduled) {
2079:                            // Update the recovery log
2080:                            if (recoveryLog != null)
2081:                                recoveryLog.logRequestCompletion(tm.getLogId(),
2082:                                        true, 0);
2083:
2084:                            // Notify scheduler for completion
2085:                            scheduler.rollbackCompleted(tm, success);
2086:                        }
2087:
2088:                        completeTransaction(tid);
2089:                    }
2090:                } catch (RuntimeException e) {
2091:                    String msg = Translate
2092:                            .get("fatal.runtime.exception.requestmanager.rollback");
2093:                    logger.fatal(msg, e);
2094:                    endUserLogger.fatal(msg);
2095:                    throw new SQLException(e.getMessage());
2096:                }
2097:            }
2098:
2099:            /**
2100:             * Rollback a transaction given its id to a savepoint given its name.
2101:             *
2102:             * @param transactionId the transaction id
2103:             * @param savepointName the name of the savepoint
2104:             * @throws SQLException if an error occurs
2105:             */
2106:            public void rollback(long transactionId, String savepointName)
2107:                    throws SQLException {
2108:                try {
2109:                    Long tid = new Long(transactionId);
2110:                    TransactionMetaData tm = getTransactionMetaData(tid);
2111:
2112:                    boolean rollbackScheduled = false;
2113:                    boolean validSavepoint = false;
2114:
2115:                    try {
2116:                        // Check that a savepoint with given name has been set
2117:                        if (!hasSavepoint(tid, savepointName))
2118:                            throw new SQLException(Translate.get(
2119:                                    "transaction.savepoint.not.found",
2120:                                    new String[] { savepointName,
2121:                                            String.valueOf(transactionId) }));
2122:
2123:                        validSavepoint = true;
2124:
2125:                        // Wait for the scheduler to give us the authorization to execute
2126:                        scheduler.rollback(tm, savepointName, null);
2127:                        rollbackScheduled = true;
2128:
2129:                        if (logger.isDebugEnabled())
2130:                            logger.debug(Translate.get(
2131:                                    "transaction.rollbacksavepoint",
2132:                                    new String[] {
2133:                                            String.valueOf(transactionId),
2134:                                            savepointName }));
2135:
2136:                        // Send to loadbalancer
2137:                        loadBalancer.rollbackToSavepoint(tm, savepointName);
2138:
2139:                        // Update the recovery log
2140:                        if (recoveryLog != null)
2141:                            recoveryLog.logRequestCompletion(tm.getLogId(),
2142:                                    true, 0);
2143:
2144:                        // Invalidate the query result cache if this transaction has updated the
2145:                        // cache or altered the schema
2146:                        if ((resultCache != null)
2147:                                && (tm.altersQueryResultCache() || tm
2148:                                        .altersDatabaseSchema()))
2149:                            resultCache.rollback(transactionId);
2150:
2151:                        // Check for schema modifications that need to be rollbacked
2152:                        if (tm.altersDatabaseSchema()) {
2153:                            if (metadataCache != null)
2154:                                metadataCache.flushCache();
2155:                            setSchemaIsDirty(true);
2156:                        }
2157:                    } catch (SQLException e) {
2158:                        /*
2159:                         * Check if the we have to remove the entry from the total order queue
2160:                         * (wait to be sure that we do it in the proper order)
2161:                         */
2162:                        DistributedRollbackToSavepoint totalOrderRollbackToSavepoint = new DistributedRollbackToSavepoint(
2163:                                transactionId, savepointName);
2164:                        if (loadBalancer.waitForTotalOrder(
2165:                                totalOrderRollbackToSavepoint, false))
2166:                            loadBalancer
2167:                                    .removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollbackToSavepoint);
2168:
2169:                        throw e;
2170:                    } catch (AllBackendsFailedException e) {
2171:                        String msg = Translate.get(
2172:                                "requestmanager.rollbackavepoint.failed.all",
2173:                                new String[] { String.valueOf(transactionId),
2174:                                        savepointName, e.getMessage() });
2175:                        logger.error(msg);
2176:                        throw new SQLException(msg);
2177:                    } finally {
2178:                        if (rollbackScheduled) {
2179:                            // Notify scheduler for completion
2180:                            scheduler.savepointCompleted(transactionId);
2181:                        }
2182:
2183:                        if (validSavepoint)
2184:                            // Remove all the savepoints set after the savepoint we rollback to
2185:                            removeSavepoints(tid, savepointName);
2186:                    }
2187:                } catch (RuntimeException e) {
2188:                    String msg = Translate
2189:                            .get("fatal.runtime.exception.requestmanager.rollbacksavepoint");
2190:                    logger.fatal(msg, e);
2191:                    endUserLogger.fatal(msg);
2192:                    throw new SQLException(e.getMessage());
2193:                }
2194:            }
2195:
2196:            /**
2197:             * Sets a unnamed savepoint to a transaction given its id.
2198:             *
2199:             * @param transactionId the transaction id
2200:             * @return the generated id of the new savepoint
2201:             * @throws SQLException if an error occurs
2202:             */
2203:            public int setSavepoint(long transactionId) throws SQLException {
2204:                try {
2205:                    Long tid = new Long(transactionId);
2206:                    TransactionMetaData tm = getTransactionMetaData(tid);
2207:                    String savepointName = "unnamed savepoint";
2208:                    int savepointId;
2209:
2210:                    boolean setSavepointScheduled = false;
2211:                    try {
2212:                        // Wait for the scheduler to give us the authorization to execute
2213:                        savepointId = scheduler.setSavepoint(tm);
2214:                        setSavepointScheduled = true;
2215:
2216:                        savepointName = String.valueOf(savepointId);
2217:
2218:                        if (logger.isDebugEnabled())
2219:                            logger.debug(Translate.get(
2220:                                    "transaction.setsavepoint", new String[] {
2221:                                            savepointName,
2222:                                            String.valueOf(transactionId) }));
2223:
2224:                        // Send to loadbalancer
2225:                        loadBalancer.setSavepoint(tm, savepointName);
2226:
2227:                        // Update the recovery log
2228:                        if (recoveryLog != null)
2229:                            recoveryLog.logRequestCompletion(tm.getLogId(),
2230:                                    true, 0);
2231:                    } catch (AllBackendsFailedException e) {
2232:                        String msg = Translate.get(
2233:                                "requestmanager.setsavepoint.failed.all",
2234:                                new String[] { savepointName,
2235:                                        String.valueOf(transactionId),
2236:                                        e.getMessage() });
2237:                        logger.error(msg);
2238:                        throw new SQLException(msg);
2239:                    } catch (SQLException e) {
2240:                        throw e;
2241:                    } finally {
2242:                        if (setSavepointScheduled) {
2243:                            // Notify scheduler for completion
2244:                            scheduler.savepointCompleted(transactionId);
2245:                        }
2246:                    }
2247:
2248:                    // Add savepoint name to list of savepoints for this transaction
2249:                    addSavepoint(tid, savepointName);
2250:                    return savepointId;
2251:                } catch (RuntimeException e) {
2252:                    String msg = Translate
2253:                            .get("fatal.runtime.exception.requestmanager.setsavepoint");
2254:                    logger.fatal(msg, e);
2255:                    endUserLogger.fatal(msg);
2256:                    throw new SQLException(e.getMessage());
2257:                }
2258:            }
2259:
2260:            /**
2261:             * Sets a savepoint given its desired name to a transaction given its id.
2262:             *
2263:             * @param transactionId the transaction id
2264:             * @param name the desired name of the savepoint
2265:             * @throws SQLException if an error occurs
2266:             */
2267:            public void setSavepoint(long transactionId, String name)
2268:                    throws SQLException {
2269:                try {
2270:                    Long tid = new Long(transactionId);
2271:                    TransactionMetaData tm = getTransactionMetaData(tid);
2272:
2273:                    boolean setSavepointScheduled = false;
2274:                    try {
2275:                        // Wait for the scheduler to give us the authorization to execute
2276:                        scheduler.setSavepoint(tm, name, null);
2277:                        setSavepointScheduled = true;
2278:
2279:                        if (logger.isDebugEnabled())
2280:                            logger.debug(Translate.get(
2281:                                    "transaction.setsavepoint",
2282:                                    new String[] { name,
2283:                                            String.valueOf(transactionId) }));
2284:
2285:                        // Send to loadbalancer
2286:                        loadBalancer.setSavepoint(tm, name);
2287:
2288:                        // Update the recovery log
2289:                        if (recoveryLog != null)
2290:                            recoveryLog.logRequestCompletion(tm.getLogId(),
2291:                                    true, 0);
2292:                    } catch (AllBackendsFailedException e) {
2293:                        String msg = Translate.get(
2294:                                "requestmanager.setsavepoint.failed.all",
2295:                                new String[] { name,
2296:                                        String.valueOf(transactionId),
2297:                                        e.getMessage() });
2298:                        logger.error(msg);
2299:                        throw new SQLException(msg);
2300:                    } catch (SQLException e) {
2301:                        /*
2302:                         * Check if the we have to remove the entry from the total order queue
2303:                         * (wait to be sure that we do it in the proper order)
2304:                         */
2305:                        DistributedSetSavepoint totalOrderSavePoint = new DistributedSetSavepoint(
2306:                                tm.getLogin(), transactionId, name);
2307:                        if (loadBalancer.waitForTotalOrder(totalOrderSavePoint,
2308:                                false))
2309:                            loadBalancer
2310:                                    .removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavePoint);
2311:
2312:                        throw e;
2313:                    } finally {
2314:                        if (setSavepointScheduled) {
2315:                            // Notify scheduler for completion
2316:                            scheduler.savepointCompleted(transactionId);
2317:                        }
2318:                    }
2319:
2320:                    // Add savepoint name to list of savepoints for this transaction
2321:                    addSavepoint(tid, name);
2322:                } catch (RuntimeException e) {
2323:                    String msg = Translate
2324:                            .get("fatal.runtime.exception.requestmanager.setsavepoint");
2325:                    logger.fatal(msg, e);
2326:                    endUserLogger.fatal(msg);
2327:                    throw new SQLException(e.getMessage());
2328:                }
2329:            }
2330:
2331:            /**
2332:             * Releases a savepoint given its name from a transaction given its id.
2333:             *
2334:             * @param transactionId the transaction id
2335:             * @param name the name of the savepoint
2336:             * @exception SQLException if an error occurs
2337:             */
2338:            public void releaseSavepoint(long transactionId, String name)
2339:                    throws SQLException {
2340:                try {
2341:                    Long tid = new Long(transactionId);
2342:                    TransactionMetaData tm = getTransactionMetaData(tid);
2343:
2344:                    boolean releasedSavepointScheduled = false;
2345:                    try {
2346:                        // Check that a savepoint with given name has been set
2347:                        if (!hasSavepoint(tid, name))
2348:                            throw new SQLException(Translate.get(
2349:                                    "transaction.savepoint.not.found",
2350:                                    new String[] { name,
2351:                                            String.valueOf(transactionId) }));
2352:
2353:                        // Wait for the scheduler to give us the authorization to execute
2354:                        scheduler.releaseSavepoint(tm, name, null);
2355:                        releasedSavepointScheduled = true;
2356:
2357:                        if (logger.isDebugEnabled())
2358:                            logger.debug(Translate.get(
2359:                                    "transaction.releasesavepoint",
2360:                                    new String[] { name,
2361:                                            String.valueOf(transactionId) }));
2362:
2363:                        // Send to loadbalancer
2364:                        loadBalancer.releaseSavepoint(tm, name);
2365:
2366:                        // Update the recovery log
2367:                        if (recoveryLog != null)
2368:                            recoveryLog.logRequestCompletion(tm.getLogId(),
2369:                                    true, 0);
2370:                    } catch (SQLException e) {
2371:                        /*
2372:                         * Check if the we have to remove the entry from the total order queue
2373:                         * (wait to be sure that we do it in the proper order)
2374:                         */
2375:                        DistributedReleaseSavepoint totalOrderReleaseSavepoint = new DistributedReleaseSavepoint(
2376:                                transactionId, name);
2377:                        if (loadBalancer.waitForTotalOrder(
2378:                                totalOrderReleaseSavepoint, false))
2379:                            loadBalancer
2380:                                    .removeObjectFromAndNotifyTotalOrderQueue(totalOrderReleaseSavepoint);
2381:
2382:                        throw e;
2383:                    } catch (AllBackendsFailedException e) {
2384:                        String msg = Translate.get(
2385:                                "requestmanager.releasesavepoint.failed.all",
2386:                                new String[] { name,
2387:                                        String.valueOf(transactionId),
2388:                                        e.getMessage() });
2389:                        logger.error(msg);
2390:                        throw new SQLException(msg);
2391:                    } finally {
2392:                        if (releasedSavepointScheduled) {
2393:                            // Notify scheduler for completion
2394:                            scheduler.savepointCompleted(transactionId);
2395:                        }
2396:
2397:                        // Remove savepoint for the transaction
2398:                        // Will log an error in case of an invalid savepoint
2399:                        removeSavepoint(tid, name);
2400:                    }
2401:                } catch (RuntimeException e) {
2402:                    String msg = Translate
2403:                            .get("fatal.runtime.exception.requestmanager.releasesavepoint");
2404:                    logger.fatal(msg, e);
2405:                    endUserLogger.fatal(msg);
2406:                    throw new SQLException(e.getMessage());
2407:                }
2408:            }
2409:
2410:            /**
2411:             * Adds a given savepoint to a given transaction
2412:             *
2413:             * @param tid transaction id
2414:             * @param savepointName name of the savepoint
2415:             */
2416:            public void addSavepoint(Long tid, String savepointName) {
2417:                LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2418:                if (savepoints == null) { // Lazy list creation
2419:                    savepoints = new LinkedList();
2420:                    tidSavepoints.put(tid, savepoints);
2421:                }
2422:
2423:                savepoints.addLast(savepointName);
2424:            }
2425:
2426:            /**
2427:             * Removes a given savepoint for a given transaction
2428:             *
2429:             * @param tid transaction id
2430:             * @param savepointName name of the savepoint
2431:             */
2432:            public void removeSavepoint(Long tid, String savepointName) {
2433:                LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2434:                if (savepoints == null)
2435:                    logger.error("No savepoints found for transaction " + tid);
2436:                else
2437:                    savepoints.remove(savepointName);
2438:            }
2439:
2440:            /**
2441:             * Removes all the savepoints set after a given savepoint for a given
2442:             * transaction
2443:             *
2444:             * @param tid transaction id
2445:             * @param savepointName name of the savepoint
2446:             */
2447:            public void removeSavepoints(Long tid, String savepointName) {
2448:                LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2449:                if (savepoints == null) {
2450:                    logger.error("No savepoints found for transaction " + tid);
2451:                    return;
2452:                }
2453:
2454:                int index = savepoints.indexOf(savepointName);
2455:                if (index == -1)
2456:                    logger.error("No savepoint with name " + savepointName
2457:                            + " found " + "for transaction " + tid);
2458:                else if (index < savepoints.size())
2459:                    savepoints.subList(index + 1, savepoints.size()).clear();
2460:            }
2461:
2462:            /**
2463:             * Check if a given savepoint has been set for a given transaction
2464:             *
2465:             * @param tid transaction id
2466:             * @param savepointName name of the savepoint
2467:             * @return true if the savepoint exists
2468:             */
2469:            public boolean hasSavepoint(Long tid, String savepointName) {
2470:                LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2471:                if (savepoints == null)
2472:                    return false;
2473:
2474:                return savepoints.contains(savepointName);
2475:            }
2476:
2477:            //
2478:            // Database Backends management
2479:            //
2480:
2481:            /**
2482:             * Enable a backend that has been previously added to this virtual database
2483:             * and that is in the disabled state.
2484:             * <p>
2485:             * The backend is enabled without further check.
2486:             * <p>
2487:             * The enableBackend method of the load balancer is called.
2488:             *
2489:             * @param db The database backend to enable
2490:             * @throws SQLException if an error occurs
2491:             */
2492:            public void enableBackend(DatabaseBackend db) throws SQLException {
2493:                db
2494:                        .setSchemaIsNeededByVdb(requiredParsingGranularity != ParsingGranularities.NO_PARSING);
2495:                loadBalancer.enableBackend(db, true);
2496:                logger.info(Translate
2497:                        .get("backend.state.enabled", db.getName()));
2498:            }
2499:
2500:            /**
2501:             * The backend must have been previously added to this virtual database and be
2502:             * in the disabled state.
2503:             * <p>
2504:             * All the queries since the given checkpoint are played and the backend state
2505:             * is set to enabled when it is completely synchronized.
2506:             * <p>
2507:             * Note that the job is performed in background by a
2508:             * <code>RecoverThread</code>. You can synchronize on thread termination if
2509:             * you need to wait for completion of this task and listen to JMX
2510:             * notifications for the success status.
2511:             *
2512:             * @param db The database backend to enable
2513:             * @param checkpointName The checkpoint name to restart from
2514:             * @return the JDBC reocver thread synchronizing the backend
2515:             * @throws SQLException if an error occurs
2516:             */
2517:            public RecoverThread enableBackendFromCheckpoint(
2518:                    DatabaseBackend db, String checkpointName)
2519:                    throws SQLException {
2520:                // Sanity checks
2521:                if (recoveryLog == null) {
2522:                    String msg = Translate.get(
2523:                            "recovery.restore.checkpoint.failed.cause.null",
2524:                            checkpointName);
2525:                    logger.error(msg);
2526:                    throw new SQLException(msg);
2527:                }
2528:
2529:                if (db.getStateValue() != BackendState.DISABLED)
2530:                    throw new SQLException(Translate.get(
2531:                            "recovery.restore.backend.state.invalid", db
2532:                                    .getName()));
2533:
2534:                db
2535:                        .setSchemaIsNeededByVdb(requiredParsingGranularity != ParsingGranularities.NO_PARSING);
2536:
2537:                RecoverThread recoverThread = new RecoverThread(scheduler,
2538:                        recoveryLog, db, this , checkpointName);
2539:
2540:                // fire the thread and forget
2541:                // exception will be reported in a jmx notification on the backend.
2542:                recoverThread.start();
2543:                return recoverThread;
2544:            }
2545:
2546:            /**
2547:             * Disable a backend that is currently enabled on this virtual database.
2548:             * <p>
2549:             * The backend is disabled without further check.
2550:             * <p>
2551:             * The load balancer disabled method is called on the specified backend.
2552:             *
2553:             * @param db The database backend to disable
2554:             * @param forceDisable true if disabling must be forced on the backend
2555:             * @throws SQLException if an error occurs
2556:             */
2557:            public void disableBackend(DatabaseBackend db, boolean forceDisable)
2558:                    throws SQLException {
2559:                if (db.isReadEnabled() || db.isWriteEnabled()) {
2560:                    loadBalancer.disableBackend(db, forceDisable);
2561:                    logger.info(Translate.get("backend.state.disabled", db
2562:                            .getName()));
2563:                } else {
2564:                    throw new SQLException(Translate.get(
2565:                            "backend.already.disabled", db.getName()));
2566:                }
2567:            }
2568:
2569:            /**
2570:             * The backend must belong to this virtual database and be in the enabled
2571:             * state. A checkpoint is stored with the given name and the backend is set to
2572:             * the disabling state. The method then wait for all in-flight transactions to
2573:             * complete before disabling completely the backend and storing its last known
2574:             * checkpoint (upon success only).
2575:             *
2576:             * @param db The database backend to disable
2577:             * @param checkpointName The checkpoint name to store
2578:             * @throws SQLException if an error occurs
2579:             */
2580:            public void disableBackendWithCheckpoint(DatabaseBackend db,
2581:                    String checkpointName) throws SQLException {
2582:                ArrayList backendsArrayList = new ArrayList();
2583:                backendsArrayList.add(new BackendInfo(db));
2584:                disableBackendsWithCheckpoint(backendsArrayList, checkpointName);
2585:            }
2586:
2587:            /**
2588:             * Disable a list of backends. Only to store only one checkpoint, and to
2589:             * disable all the backends at the same time so the the system is in a
2590:             * coherent state. Consider only the backends that were enabled. The others
2591:             * are left in the state they were before.
2592:             *
2593:             * @param backendInfos a List of BackendInfo to disable
2594:             * @param checkpointName to store
2595:             * @throws SQLException if an error occurs
2596:             */
2597:            public void disableBackendsWithCheckpoint(
2598:                    final ArrayList/* <BackendInfo> */backendInfos,
2599:                    String checkpointName) throws SQLException {
2600:                // Sanity checks
2601:                if (recoveryLog == null) {
2602:                    String msg = Translate.get(
2603:                            "recovery.store.checkpoint.failed.cause.null",
2604:                            checkpointName);
2605:                    logger.error(msg);
2606:                    throw new SQLException(msg);
2607:                }
2608:
2609:                // Wait for all pending writes to finish
2610:                logger
2611:                        .info(Translate
2612:                                .get("requestmanager.wait.pending.writes"));
2613:                scheduler.suspendNewWrites();
2614:                scheduler.waitForSuspendedWritesToComplete();
2615:
2616:                // Store checkpoint
2617:                recoveryLog.storeCheckpoint(checkpointName);
2618:                logger.info(Translate.get("recovery.checkpoint.stored",
2619:                        checkpointName));
2620:
2621:                // Copy the list and consider only the backends that are enabled
2622:                List backendsToDisable = new ArrayList();
2623:                Iterator iter = backendInfos.iterator();
2624:                while (iter.hasNext()) {
2625:                    BackendInfo backendInfo = (BackendInfo) iter.next();
2626:                    DatabaseBackend db;
2627:                    try {
2628:                        db = vdb.getAndCheckBackend(backendInfo.getName(),
2629:                                VirtualDatabase.NO_CHECK_BACKEND);
2630:                        if (db.isWriteEnabled()) {
2631:                            backendsToDisable.add(db);
2632:                        }
2633:                    } catch (VirtualDatabaseException e) {
2634:                        if (logger.isWarnEnabled()) {
2635:                            logger.warn(e.getMessage(), e);
2636:                        }
2637:                    }
2638:                }
2639:
2640:                // Signal all backends that they should not begin any new transaction
2641:                int size = backendsToDisable.size();
2642:                for (int i = 0; i < size; i++) {
2643:                    DatabaseBackend db = (DatabaseBackend) backendsToDisable
2644:                            .get(i);
2645:                    db.setState(BackendState.DISABLING);
2646:                    logger.info(Translate.get("backend.state.disabling", db
2647:                            .getName()));
2648:                }
2649:
2650:                // Resume writes
2651:                logger.info(Translate
2652:                        .get("requestmanager.resume.pending.writes"));
2653:                scheduler.resumeWrites();
2654:
2655:                // Wait for all current transactions on backends to finish
2656:                for (int i = 0; i < size; i++) {
2657:                    DatabaseBackend db = (DatabaseBackend) backendsToDisable
2658:                            .get(i);
2659:                    db
2660:                            .waitForAllTransactionsAndPersistentConnectionsToComplete();
2661:                }
2662:
2663:                // Now we can safely disable all backends
2664:                for (int i = 0; i < size; i++) {
2665:                    DatabaseBackend db = (DatabaseBackend) backendsToDisable
2666:                            .get(i);
2667:                    db.setLastKnownCheckpoint(checkpointName);
2668:                    loadBalancer.disableBackend(db, true);
2669:                    logger.info(Translate.get("backend.state.disabled", db
2670:                            .getName()));
2671:                }
2672:            }
2673:
2674:            /**
2675:             * Create a backup from the content of a backend.
2676:             *
2677:             * @param backend the target backend to backup
2678:             * @param login the login to use to connect to the database for the backup
2679:             *          operation
2680:             * @param password the password to use to connect to the database for the
2681:             *          backup operation
2682:             * @param dumpName the name of the dump to create
2683:             * @param backuperName the logical name of the backuper to use
2684:             * @param path the path where to store the dump
2685:             * @param tables the list of tables to backup, null means all tables
2686:             * @throws SQLException if the backup fails
2687:             */
2688:            public void backupBackend(DatabaseBackend backend, String login,
2689:                    String password, String dumpName, String backuperName,
2690:                    String path, ArrayList tables) throws SQLException {
2691:                Backuper backuper = backupManager
2692:                        .getBackuperByName(backuperName);
2693:                if (backuper == null)
2694:                    throw new SQLException("No backuper named " + backuperName
2695:                            + " was found.");
2696:
2697:                boolean enableAfter = false;
2698:                String checkpointName = vdb.buildCheckpointName("backup "
2699:                        + dumpName);
2700:                if (backend.isReadEnabled() || backend.isWriteEnabled()) { // Disable backend and store checkpoint
2701:                    disableBackendWithCheckpoint(backend, checkpointName);
2702:                    logger.info(Translate.get("backend.state.disabled", backend
2703:                            .getName()));
2704:                    enableAfter = true;
2705:                } else {
2706:                    if (backend.getLastKnownCheckpoint() == null) {
2707:                        throw new SQLException(Translate.get(
2708:                                "controller.backup.no.lastknown.checkpoint",
2709:                                backend.getName()));
2710:                    }
2711:                }
2712:                // else backend is already disabled, no checkpoint is stored here, it should
2713:                // have been done at disable time.
2714:
2715:                try {
2716:                    logger.info(Translate.get("controller.backup.start",
2717:                            backend.getName()));
2718:
2719:                    // Sanity check to be sure that no pending request is in the pipe for the
2720:                    // backend
2721:                    Vector pending = backend.getPendingRequests();
2722:                    if (pending.size() != 0) {
2723:                        if (logger.isWarnEnabled()) {
2724:                            int shortFormLength = this .getVirtualDatabase()
2725:                                    .getSqlShortFormLength();
2726:                            for (int i = 0; i < pending.size(); i++)
2727:                                logger
2728:                                        .warn("Pending:"
2729:                                                + ((AbstractRequest) pending
2730:                                                        .get(i))
2731:                                                        .toStringShortForm(shortFormLength));
2732:
2733:                            logger.warn("Pending Requests:"
2734:                                    + backend.getPendingRequests().size());
2735:                            logger.warn("Read enabled:"
2736:                                    + backend.isReadEnabled());
2737:                            logger.warn("Write enabled:"
2738:                                    + backend.isWriteEnabled());
2739:                        }
2740:                        throw new BackupException(Translate.get(
2741:                                "backend.not.ready.for.backup", pending.size()));
2742:                    }
2743:
2744:                    // Let's start the backup
2745:                    backend.setState(BackendState.BACKUPING);
2746:                    Date dumpDate = backuper.backup(backend, login, password,
2747:                            dumpName, path, tables);
2748:                    if (recoveryLog != null) {
2749:                        DumpInfo dump = new DumpInfo(dumpName, dumpDate, path,
2750:                                backuper.getDumpFormat(), backend
2751:                                        .getLastKnownCheckpoint(), backend
2752:                                        .getName(), "*");
2753:                        recoveryLog.storeDump(dump);
2754:                    }
2755:
2756:                    // Notify that a new dump is available
2757:                    backend
2758:                            .notifyJmx(SequoiaNotificationList.VIRTUALDATABASE_NEW_DUMP_LIST);
2759:                } catch (BackupException be) {
2760:                    logger.error(Translate.get("controller.backup.failed"), be);
2761:                    throw new SQLException(be.getMessage());
2762:                } catch (RuntimeException e) {
2763:                    logger.error(Translate.get("controller.backup.failed"), e);
2764:                    throw new SQLException(e.getMessage());
2765:                } finally {
2766:                    // Switch from BACKUPING to DISABLED STATE
2767:                    backend.setState(BackendState.DISABLED);
2768:                }
2769:
2770:                logger.info(Translate.get("controller.backup.complete", backend
2771:                        .getName()));
2772:
2773:                if (enableAfter) {
2774:                    RecoverThread thread = enableBackendFromCheckpoint(backend,
2775:                            checkpointName);
2776:                    try {
2777:                        thread.join();
2778:                    } catch (InterruptedException e) {
2779:                        logger.error("Recovery thread has been interrupted", e);
2780:                    }
2781:                }
2782:
2783:            }
2784:
2785:            /**
2786:             * Restore a dump on a specific backend. The proper Backuper is retrieved
2787:             * automatically according to the dump format stored in the recovery log dump
2788:             * table.
2789:             * <p>
2790:             * This method disables the backend and leave it disabled after recovery
2791:             * process. The user has to call the <code>enableBackendFromCheckpoint</code>
2792:             * after this.
2793:             *
2794:             * @param backend the backend to restore
2795:             * @param login the login to use to connect to the database for the restore
2796:             *          operation
2797:             * @param password the password to use to connect to the database for the
2798:             *          restore operation
2799:             * @param dumpName the name of the dump to restore
2800:             * @param tables the list of tables to restore, null means all tables
2801:             * @throws BackupException if the restore operation failed
2802:             */
2803:            public void restoreBackendFromBackupCheckpoint(
2804:                    DatabaseBackend backend, String login, String password,
2805:                    String dumpName, ArrayList tables) throws BackupException {
2806:                DumpInfo dumpInfo;
2807:                try {
2808:                    dumpInfo = recoveryLog.getDumpInfo(dumpName);
2809:                } catch (SQLException e) {
2810:                    throw new BackupException(
2811:                            "Recovery log error access occured while retrieving information for dump "
2812:                                    + dumpName, e);
2813:                }
2814:                if (dumpInfo == null)
2815:                    throw new BackupException(
2816:                            "No information was found in the dump table for dump "
2817:                                    + dumpName);
2818:
2819:                Backuper backuper = backupManager.getBackuperByFormat(dumpInfo
2820:                        .getDumpFormat());
2821:                if (backuper == null)
2822:                    throw new BackupException(
2823:                            "No backuper was found to handle dump format "
2824:                                    + dumpInfo.getDumpFormat());
2825:
2826:                if (backend.isReadEnabled())
2827:                    throw new BackupException(
2828:                            "Unable to restore a dump on an active backend. Disable the backend first.");
2829:
2830:                try {
2831:                    backend.setState(BackendState.RESTORING);
2832:
2833:                    backuper.restore(backend, login, password, dumpName,
2834:                            dumpInfo.getDumpPath(), tables);
2835:
2836:                    // Set the checkpoint name corresponding to this database dump
2837:                    backend
2838:                            .setLastKnownCheckpoint(dumpInfo
2839:                                    .getCheckpointName());
2840:                    backend.setState(BackendState.DISABLED);
2841:                } catch (BackupException be) {
2842:                    backend.setState(BackendState.UNKNOWN);
2843:                    logger.error(Translate
2844:                            .get("controller.backup.recovery.failed"), be);
2845:                    throw be;
2846:                } finally {
2847:                    logger.info(Translate.get(
2848:                            "controller.backup.recovery.done", backend
2849:                                    .getName()));
2850:                }
2851:            }
2852:
2853:            /**
2854:             * Store all the backends checkpoint in the recoverylog
2855:             *
2856:             * @param databaseName the virtual database name
2857:             * @param backends the <code>Arraylist</code> of backends
2858:             */
2859:            public void storeBackendsInfo(String databaseName,
2860:                    ArrayList backends) {
2861:                if (recoveryLog == null)
2862:                    return;
2863:                int size = backends.size();
2864:                DatabaseBackend backend;
2865:                for (int i = 0; i < size; i++) {
2866:                    backend = (DatabaseBackend) backends.get(i);
2867:                    try {
2868:                        recoveryLog.storeBackendRecoveryInfo(databaseName,
2869:                                new BackendRecoveryInfo(backend.getName(),
2870:                                        backend.getLastKnownCheckpoint(),
2871:                                        backend.getStateValue(), databaseName));
2872:                    } catch (SQLException e) {
2873:                        logger.error(Translate.get(
2874:                                "recovery.store.checkpoint.failed",
2875:                                new String[] { backend.getName(),
2876:                                        e.getMessage() }), e);
2877:                    }
2878:                }
2879:            }
2880:
2881:            /**
2882:             * Remove a checkpoint and corresponding entries from the log table
2883:             *
2884:             * @param checkpointName to remove
2885:             */
2886:            public void removeCheckpoint(String checkpointName) {
2887:                recoveryLog.removeCheckpoint(checkpointName);
2888:            }
2889:
2890:            //
2891:            // Database schema management
2892:            //
2893:
2894:            /**
2895:             * Get the <code>DatabaseSchema</code> used by this Request Manager.
2896:             *
2897:             * @return a <code>DatabaseSchema</code> value
2898:             */
2899:            public synchronized DatabaseSchema getDatabaseSchema() {
2900:                try {
2901:                    // Refresh schema if needed. Note that this will break static schemas if
2902:                    // any
2903:                    if (schemaIsDirty) {
2904:                        if (logger.isDebugEnabled())
2905:                            logger
2906:                                    .debug("Database schema is dirty, refreshing it");
2907:                        DatabaseSchema activeSchema = vdb
2908:                                .getDatabaseSchemaFromActiveBackends();
2909:                        if (activeSchema != null) {
2910:                            if (dbs == null)
2911:                                dbs = activeSchema;
2912:                            else
2913:                                dbs.mergeSchema(activeSchema);
2914:                            setSchemaIsDirty(false);
2915:                        }
2916:                    }
2917:                } catch (SQLException e) {
2918:                    logger.error("Unable to refresh schema", e);
2919:                }
2920:                return dbs;
2921:            }
2922:
2923:            /**
2924:             * Merge the given schema with the existing database schema.
2925:             *
2926:             * @param backendSchema The virtual database schema to merge.
2927:             */
2928:            public synchronized void mergeDatabaseSchema(
2929:                    DatabaseSchema backendSchema) {
2930:                try {
2931:                    if (dbs == null)
2932:                        setDatabaseSchema(new DatabaseSchema(backendSchema),
2933:                                false);
2934:                    else {
2935:                        dbs.mergeSchema(backendSchema);
2936:                        logger
2937:                                .info(Translate
2938:                                        .get("requestmanager.schema.virtualdatabase.merged.new"));
2939:
2940:                        if (schedulerParsingranularity != ParsingGranularities.NO_PARSING)
2941:                            scheduler.mergeDatabaseSchema(dbs);
2942:
2943:                        if (cacheParsingranularity != ParsingGranularities.NO_PARSING)
2944:                            resultCache.mergeDatabaseSchema(dbs);
2945:                    }
2946:                } catch (SQLException e) {
2947:                    logger.error(Translate.get(
2948:                            "requestmanager.schema.merge.failed", e
2949:                                    .getMessage()), e);
2950:                }
2951:            }
2952:
2953:            /**
2954:             * Sets the <code>DatabaseSchema</code> to be able to parse the requests and
2955:             * find dependencies.
2956:             *
2957:             * @param schema a <code>DatabaseSchema</code> value
2958:             * @param isStatic true if the given schema is static
2959:             */
2960:            public synchronized void setDatabaseSchema(DatabaseSchema schema,
2961:                    boolean isStatic) {
2962:                if (schemaIsStatic) {
2963:                    if (isStatic) {
2964:                        logger
2965:                                .warn(Translate
2966:                                        .get("requestmanager.schema.replace.static.with.new"));
2967:                        this .dbs = schema;
2968:                    } else
2969:                        logger
2970:                                .info(Translate
2971:                                        .get("requestmanager.schema.ignore.new.dynamic"));
2972:                } else {
2973:                    schemaIsStatic = isStatic;
2974:                    this .dbs = schema;
2975:                    logger
2976:                            .info(Translate
2977:                                    .get("requestmanager.schema.set.new.virtualdatabase"));
2978:                }
2979:
2980:                if (schedulerParsingranularity != ParsingGranularities.NO_PARSING)
2981:                    scheduler.setDatabaseSchema(dbs);
2982:
2983:                if (cacheParsingranularity != ParsingGranularities.NO_PARSING)
2984:                    resultCache.setDatabaseSchema(dbs);
2985:
2986:                // Load balancers do not have a specific database schema to update
2987:            }
2988:
2989:            /**
2990:             * Sets the schemaIsDirty value if the backend schema needs to be refreshed.
2991:             *
2992:             * @param schemaIsDirty The schemaIsDirty to set.
2993:             */
2994:            public synchronized void setSchemaIsDirty(boolean schemaIsDirty) {
2995:                this .schemaIsDirty = schemaIsDirty;
2996:            }
2997:
2998:            //
2999:            // Getter/Setter methods
3000:            //
3001:
3002:            /**
3003:             * Returns the vdb value.
3004:             *
3005:             * @return Returns the vdb.
3006:             */
3007:            public VirtualDatabase getVirtualDatabase() {
3008:                return vdb;
3009:            }
3010:
3011:            /**
3012:             * Sets the backup manager for this recovery log
3013:             *
3014:             * @param currentBackupManager an instance of <code>BackupManager</code>
3015:             */
3016:            public void setBackupManager(BackupManager currentBackupManager) {
3017:                this .backupManager = currentBackupManager;
3018:            }
3019:
3020:            /**
3021:             * Returns the backupManager value.
3022:             *
3023:             * @return Returns the backupManager.
3024:             */
3025:            public BackupManager getBackupManager() {
3026:                return backupManager;
3027:            }
3028:
3029:            /**
3030:             * Get the Request Load Balancer used in this Request Controller.
3031:             *
3032:             * @return an <code>AbstractLoadBalancer</code> value
3033:             */
3034:            public AbstractLoadBalancer getLoadBalancer() {
3035:                return loadBalancer;
3036:            }
3037:
3038:            /**
3039:             * Set the Request Load Balancer to use in this Request Controller.
3040:             *
3041:             * @param loadBalancer a Request Load Balancer implementation
3042:             */
3043:            public void setLoadBalancer(AbstractLoadBalancer loadBalancer) {
3044:                if (this .loadBalancer != null)
3045:                    throw new RuntimeException(
3046:                            "It is not possible to dynamically change a load balancer.");
3047:                this .loadBalancer = loadBalancer;
3048:                if (loadBalancer == null)
3049:                    return;
3050:                loadBalancerParsingranularity = loadBalancer
3051:                        .getParsingGranularity();
3052:                if (loadBalancerParsingranularity > requiredParsingGranularity)
3053:                    requiredParsingGranularity = loadBalancerParsingranularity;
3054:
3055:                if (MBeanServerManager.isJmxEnabled()) {
3056:                    try {
3057:                        MBeanServerManager.registerMBean(
3058:                                new LoadBalancerControl(loadBalancer),
3059:                                JmxConstants.getLoadBalancerObjectName(vdb
3060:                                        .getVirtualDatabaseName()));
3061:                    } catch (Exception e) {
3062:                        logger.error(Translate
3063:                                .get("jmx.failed.register.mbean.loadbalancer"));
3064:                    }
3065:                }
3066:
3067:            }
3068:
3069:            /**
3070:             * Get the result cache (if any) used in this Request Manager.
3071:             *
3072:             * @return an <code>AbstractResultCache</code> value or null if no Reqsult
3073:             *         Cache has been defined
3074:             */
3075:            public AbstractResultCache getResultCache() {
3076:                return resultCache;
3077:            }
3078:
3079:            /**
3080:             * Returns the metadataCache value.
3081:             *
3082:             * @return Returns the metadataCache.
3083:             */
3084:            public MetadataCache getMetadataCache() {
3085:                return metadataCache;
3086:            }
3087:
3088:            /**
3089:             * Sets the metadataCache value.
3090:             *
3091:             * @param metadataCache The metadataCache to set.
3092:             */
3093:            public void setMetadataCache(MetadataCache metadataCache) {
3094:                this .metadataCache = metadataCache;
3095:            }
3096:
3097:            /**
3098:             * Sets the ParsingCache.
3099:             *
3100:             * @param parsingCache The parsingCache to set.
3101:             */
3102:            public void setParsingCache(ParsingCache parsingCache) {
3103:                parsingCache.setRequestManager(this );
3104:                parsingCache.setGranularity(requiredParsingGranularity);
3105:                parsingCache.setCaseSensitiveParsing(isCaseSensitiveParsing);
3106:                this .parsingCache = parsingCache;
3107:            }
3108:
3109:            /**
3110:             * Returns the Recovery Log Manager.
3111:             *
3112:             * @return RecoveryLog the current recovery log (null if none)
3113:             */
3114:            public RecoveryLog getRecoveryLog() {
3115:                return recoveryLog;
3116:            }
3117:
3118:            /**
3119:             * Sets the Recovery Log Manager.
3120:             *
3121:             * @param recoveryLog The log recovery to set
3122:             */
3123:            public void setRecoveryLog(RecoveryLog recoveryLog) {
3124:                if (recoveryLog == null)
3125:                    return;
3126:                this .recoveryLog = recoveryLog;
3127:                loadBalancer.setRecoveryLog(recoveryLog);
3128:                ArrayList backends = vdb.getBackends();
3129:                int size = backends.size();
3130:                backendStateListener = new BackendStateListener(vdb
3131:                        .getVirtualDatabaseName(), recoveryLog);
3132:                for (int i = 0; i < size; i++)
3133:                    ((DatabaseBackend) backends.get(i))
3134:                            .setStateListener(backendStateListener);
3135:            }
3136:
3137:            /**
3138:             * Set the Request Cache to use in this Request Controller.
3139:             *
3140:             * @param cache a Request Cache implementation
3141:             */
3142:            public void setResultCache(AbstractResultCache cache) {
3143:                resultCache = cache;
3144:                cacheParsingranularity = cache.getParsingGranularity();
3145:                if (cacheParsingranularity > requiredParsingGranularity)
3146:                    requiredParsingGranularity = cacheParsingranularity;
3147:            }
3148:
3149:            /**
3150:             * Get the Request Scheduler (if any) used in this Request Controller.
3151:             *
3152:             * @return an <code>AbstractScheduler</code> value or null if no Request
3153:             *         Scheduler has been defined
3154:             */
3155:            public AbstractScheduler getScheduler() {
3156:                return scheduler;
3157:            }
3158:
3159:            /**
3160:             * Set the Request Scheduler to use in this Request Controller.
3161:             *
3162:             * @param scheduler a Request Scheduler implementation
3163:             */
3164:            public void setScheduler(AbstractScheduler scheduler) {
3165:                this .scheduler = scheduler;
3166:                schedulerParsingranularity = scheduler.getParsingGranularity();
3167:                if (schedulerParsingranularity > requiredParsingGranularity)
3168:                    requiredParsingGranularity = schedulerParsingranularity;
3169:            }
3170:
3171:            /**
3172:             * Sets the parsing case sensitivity. If true the request are parsed in a case
3173:             * sensitive way (table/column name must match exactly the case of the names
3174:             * fetched from the database or enforced by a static schema).
3175:             *
3176:             * @param isCaseSensitiveParsing true if parsing is case sensitive
3177:             */
3178:            public void setCaseSensitiveParsing(boolean isCaseSensitiveParsing) {
3179:                this .isCaseSensitiveParsing = isCaseSensitiveParsing;
3180:                if (parsingCache != null)
3181:                    parsingCache
3182:                            .setCaseSensitiveParsing(isCaseSensitiveParsing);
3183:            }
3184:
3185:            //
3186:            // Debug/Monitoring
3187:            //
3188:
3189:            /**
3190:             * Get xml information about this Request Manager
3191:             *
3192:             * @return <code>String</code> in xml formatted text
3193:             */
3194:            public String getXml() {
3195:                StringBuffer info = new StringBuffer();
3196:                info.append("<" + DatabasesXmlTags.ELT_RequestManager + " "
3197:                        + DatabasesXmlTags.ATT_caseSensitiveParsing + "=\""
3198:                        + isCaseSensitiveParsing + "\" "
3199:                        + DatabasesXmlTags.ATT_beginTimeout + "=\""
3200:                        + beginTimeout / 1000 + "\" "
3201:                        + DatabasesXmlTags.ATT_commitTimeout + "=\""
3202:                        + commitTimeout / 1000 + "\" "
3203:                        + DatabasesXmlTags.ATT_rollbackTimeout + "=\""
3204:                        + rollbackTimeout / 1000 + "\">");
3205:                if (scheduler != null)
3206:                    info.append(scheduler.getXml());
3207:
3208:                if (metadataCache != null || parsingCache != null
3209:                        || resultCache != null) {
3210:                    info.append("<" + DatabasesXmlTags.ELT_RequestCache + ">");
3211:                    if (metadataCache != null)
3212:                        info.append(metadataCache.getXml());
3213:                    if (parsingCache != null)
3214:                        info.append(parsingCache.getXml());
3215:                    if (resultCache != null)
3216:                        info.append(resultCache.getXml());
3217:                    info.append("</" + DatabasesXmlTags.ELT_RequestCache + ">");
3218:                }
3219:
3220:                if (loadBalancer != null)
3221:                    info.append(loadBalancer.getXml());
3222:                if (recoveryLog != null)
3223:                    info.append(this .recoveryLog.getXml());
3224:                info.append("</" + DatabasesXmlTags.ELT_RequestManager + ">");
3225:                return info.toString();
3226:            }
3227:
3228:            /**
3229:             * Returns the backendStateListener value.
3230:             *
3231:             * @return Returns the backendStateListener.
3232:             */
3233:            public BackendStateListener getBackendStateListener() {
3234:                return backendStateListener;
3235:            }
3236:
3237:            /**
3238:             * Returns the beginTimeout value.
3239:             *
3240:             * @return Returns the beginTimeout.
3241:             */
3242:            public long getBeginTimeout() {
3243:                return beginTimeout;
3244:            }
3245:
3246:            /**
3247:             * Returns the cacheParsingranularity value.
3248:             *
3249:             * @return Returns the cacheParsingranularity.
3250:             */
3251:            public int getCacheParsingranularity() {
3252:                return cacheParsingranularity;
3253:            }
3254:
3255:            /**
3256:             * Sets the cacheParsingranularity value.
3257:             *
3258:             * @param cacheParsingranularity The cacheParsingranularity to set.
3259:             */
3260:            public void setCacheParsingranularity(int cacheParsingranularity) {
3261:                this .cacheParsingranularity = cacheParsingranularity;
3262:            }
3263:
3264:            /**
3265:             * Returns the commitTimeout value.
3266:             *
3267:             * @return Returns the commitTimeout.
3268:             */
3269:            public long getCommitTimeout() {
3270:                return commitTimeout;
3271:            }
3272:
3273:            /**
3274:             * Returns the number of savepoints that are defined for a given transaction
3275:             *
3276:             * @param tId the transaction id
3277:             * @return the number of savepoints that are defined in the transaction whose
3278:             *         id is tId
3279:             */
3280:            public int getNumberOfSavepointsInTransaction(long tId) {
3281:                LinkedList savepoints = (LinkedList) tidSavepoints
3282:                        .get(new Long(tId));
3283:                if (savepoints == null) {
3284:                    return 0;
3285:                } else
3286:                    return savepoints.size();
3287:            }
3288:
3289:            /**
3290:             * Returns the requiredParsingGranularity value.
3291:             *
3292:             * @return Returns the requiredParsingGranularity.
3293:             */
3294:            public int getRequiredParsingGranularity() {
3295:                return requiredParsingGranularity;
3296:            }
3297:
3298:            /**
3299:             * Returns the rollbackTimeout value.
3300:             *
3301:             * @return Returns the rollbackTimeout.
3302:             */
3303:            public long getRollbackTimeout() {
3304:                return rollbackTimeout;
3305:            }
3306:
3307:            /**
3308:             * Returns the schedulerParsingranularity value.
3309:             *
3310:             * @return Returns the schedulerParsingranularity.
3311:             */
3312:            public int getSchedulerParsingranularity() {
3313:                return schedulerParsingranularity;
3314:            }
3315:
3316:            /**
3317:             * Sets the schedulerParsingranularity value.
3318:             *
3319:             * @param schedulerParsingranularity The schedulerParsingranularity to set.
3320:             */
3321:            public void setSchedulerParsingranularity(
3322:                    int schedulerParsingranularity) {
3323:                this .schedulerParsingranularity = schedulerParsingranularity;
3324:            }
3325:
3326:            /**
3327:             * Returns the schemaIsStatic value.
3328:             *
3329:             * @return Returns the schemaIsStatic.
3330:             */
3331:            public boolean isStaticSchema() {
3332:                return schemaIsStatic;
3333:            }
3334:
3335:            /**
3336:             * Sets the schemaIsStatic value.
3337:             *
3338:             * @param schemaIsStatic The schemaIsStatic to set.
3339:             */
3340:            public void setStaticSchema(boolean schemaIsStatic) {
3341:                this .schemaIsStatic = schemaIsStatic;
3342:            }
3343:
3344:            /**
3345:             * Returns the isCaseSensitiveParsing value.
3346:             *
3347:             * @return Returns the isCaseSensitiveParsing.
3348:             */
3349:            public boolean isCaseSensitiveParsing() {
3350:                return isCaseSensitiveParsing;
3351:            }
3352:
3353:            /**
3354:             * @see org.continuent.sequoia.controller.jmx.AbstractStandardMBean#getAssociatedString()
3355:             */
3356:            public String getAssociatedString() {
3357:                return "requestmanager";
3358:            }
3359:
3360:            /**
3361:             * Resume all transactions, writes and persistent connections.
3362:             */
3363:            public void resumeActivity() {
3364:                scheduler.resumeWrites();
3365:                scheduler.resumeNewTransactions();
3366:                scheduler.resumeOpenClosePersistentConnection();
3367:            }
3368:
3369:            /**
3370:             * Suspend all transactions, writes and persistent connections.
3371:             *
3372:             * @throws SQLException if an error occured while waiting for writes or
3373:             *           transactions to complete
3374:             */
3375:            public void suspendActivity() throws SQLException {
3376:                // Suspend opening and closing of persistent connections
3377:                scheduler.suspendOpenClosePersistentConnection();
3378:
3379:                // Suspend new transactions
3380:                scheduler.suspendNewTransactions();
3381:
3382:                // Suspend new writes
3383:                scheduler.suspendNewWrites();
3384:
3385:                // Wait for suspended activity to complete
3386:                scheduler.waitForSuspendedTransactionsToComplete();
3387:                scheduler.waitForSuspendedWritesToComplete();
3388:                scheduler.waitForPendingOpenClosePersistentConnection();
3389:            }
3390:
3391:            /**
3392:             * {@inheritDoc}
3393:             *
3394:             * @see org.continuent.sequoia.common.jmx.mbeans.RequestManagerMBean#parseSqlRequest(java.lang.String,
3395:             *      java.lang.String)
3396:             */
3397:            public String parseSqlRequest(String sqlRequest,
3398:                    String lineSeparator) {
3399:                RequestFactory requestFactory = ControllerConstants.CONTROLLER_FACTORY
3400:                        .getRequestFactory();
3401:                AbstractRequest request = requestFactory.requestFromString(
3402:                        sqlRequest, false, false, 0, lineSeparator);
3403:                try {
3404:                    request.parse(getDatabaseSchema(),
3405:                            ParsingGranularities.TABLE, isCaseSensitiveParsing);
3406:                } catch (SQLException ignored) {
3407:                }
3408:                return request.getParsingResultsAsString();
3409:            }
3410:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.