Source Code Cross Referenced for DistributedRequestManager.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » requestmanager » distributed » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.requestmanager.distributed 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2005-2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         * 
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         * 
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         * 
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License. 
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet.
0022:         * Contributor(s): Olivier Fambon, Jean-Bernard van Zuylen, Damian Arregui, 
0023:         *   Peter Royal.
0024:         */package org.continuent.sequoia.controller.requestmanager.distributed;
0025:
0026:        import java.io.Serializable;
0027:        import java.sql.SQLException;
0028:        import java.util.ArrayList;
0029:        import java.util.HashMap;
0030:        import java.util.Iterator;
0031:        import java.util.LinkedList;
0032:        import java.util.List;
0033:
0034:        import javax.management.NotCompliantMBeanException;
0035:
0036:        import org.continuent.hedera.adapters.MulticastRequestAdapter;
0037:        import org.continuent.hedera.adapters.MulticastResponse;
0038:        import org.continuent.hedera.common.Member;
0039:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0040:        import org.continuent.sequoia.common.exceptions.NoResultAvailableException;
0041:        import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0042:        import org.continuent.sequoia.common.i18n.Translate;
0043:        import org.continuent.sequoia.common.jmx.management.BackendInfo;
0044:        import org.continuent.sequoia.common.log.Trace;
0045:        import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0046:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0047:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0048:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0049:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0050:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0051:        import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
0052:        import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0053:        import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0054:        import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0055:        import org.continuent.sequoia.controller.requestmanager.RequestManager;
0056:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0057:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0058:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0059:        import org.continuent.sequoia.controller.requests.SelectRequest;
0060:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0061:        import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0062:        import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0063:        import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
0064:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0065:        import org.continuent.sequoia.controller.virtualdatabase.protocol.BlockActivity;
0066:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint;
0067:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
0068:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0069:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0070:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0071:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0072:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0073:        import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForPersistentConnection;
0074:        import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForTransaction;
0075:        import org.continuent.sequoia.controller.virtualdatabase.protocol.GetRequestResultFromFailoverCache;
0076:        import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyCompletion;
0077:        import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyDisableBackend;
0078:        import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyEnableBackend;
0079:        import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyInconsistency;
0080:        import org.continuent.sequoia.controller.virtualdatabase.protocol.ResumeActivity;
0081:        import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendActivity;
0082:
0083:        /**
0084:         * This class defines a Distributed Request Manager.
0085:         * <p>
0086:         * The DRM is composed of a Request Scheduler, an optional Query Cache, and a
0087:         * Load Balancer and an optional Recovery Log. Unlike a non-distributed Request
0088:         * Manager, this implementation is responsible for synchronizing the different
0089:         * controllers components (schedulers, ...). Functions that are RAIDb level
0090:         * dependent are implemented in sub-classes.
0091:         * 
0092:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0093:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0094:         *         </a>
0095:         * @author <a href="mailto:Damian.Arregui@emicnetworks.com">Damian Arregui</a>
0096:         * @version 1.0
0097:         */
0098:        public abstract class DistributedRequestManager extends RequestManager {
0099:            protected DistributedVirtualDatabase dvdb;
0100:            /**
0101:             * List of queries that failed on all backends. Value contained in the map is
0102:             * a boolean indicating whether the request has been scheduled or not before
0103:             * the failure so that we know if the scheduler must be notified or not.
0104:             */
0105:            private HashMap failedOnAllBackends;
0106:            /** Unique controller identifier */
0107:            private long controllerId;
0108:            /** List of transactions that have executed on multiple controllers */
0109:            protected LinkedList distributedTransactions;
0110:
0111:            /**
0112:             * Constant to acknowledge the successful completion of a distributed query
0113:             */
0114:            public static final Integer SUCCESSFUL_COMPLETION = new Integer(-1);
0115:
0116:            /**
0117:             * Builds a new <code>DistributedRequestManager</code> instance without
0118:             * cache.
0119:             * 
0120:             * @param vdb the virtual database this request manager belongs to
0121:             * @param scheduler the Request Scheduler to use
0122:             * @param cache a Query Cache implementation
0123:             * @param loadBalancer the Request Load Balancer to use
0124:             * @param recoveryLog the Log Recovery to use
0125:             * @param beginTimeout timeout in seconds for begin
0126:             * @param commitTimeout timeout in seconds for commit
0127:             * @param rollbackTimeout timeout in seconds for rollback
0128:             * @throws SQLException if an error occurs
0129:             * @throws NotCompliantMBeanException if this class is not a compliant JMX
0130:             *           MBean
0131:             */
0132:            public DistributedRequestManager(DistributedVirtualDatabase vdb,
0133:                    AbstractScheduler scheduler, AbstractResultCache cache,
0134:                    AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
0135:                    long beginTimeout, long commitTimeout, long rollbackTimeout)
0136:                    throws SQLException, NotCompliantMBeanException {
0137:                super (vdb, scheduler, cache, loadBalancer, recoveryLog,
0138:                        beginTimeout, commitTimeout, rollbackTimeout);
0139:                dvdb = vdb;
0140:                failedOnAllBackends = new HashMap();
0141:                distributedTransactions = new LinkedList();
0142:            }
0143:
0144:            //
0145:            // Controller identifier related functions
0146:            //
0147:
0148:            /**
0149:             * Effective controllerIds are on the upper 16 bits of a long (64 bits).
0150:             * Distributed transaction ids (longs) are layed out as [ControllerId(16bits) |
0151:             * LocalTransactionId(64bits)]. <br/>This constant used in
0152:             * DistributedVirtualDatabase.
0153:             */
0154:            public static final long CONTROLLER_ID_BIT_MASK = 0xffff000000000000L;
0155:            /**
0156:             * TRANSACTION_ID_BIT_MASK is used to get the transaction id local to the
0157:             * originating controller
0158:             */
0159:            public static final long TRANSACTION_ID_BIT_MASK = ~CONTROLLER_ID_BIT_MASK;
0160:
0161:            /**
0162:             * @see #CONTROLLER_ID_BIT_MASK
0163:             */
0164:            public static final int CONTROLLER_ID_SHIFT_BITS = 48;
0165:
0166:            /**
0167:             * @see #CONTROLLER_ID_BIT_MASK
0168:             */
0169:            public static final long CONTROLLER_ID_BITS = 0x000000000000ffffL;
0170:
0171:            /**
0172:             * Returns the unique controller identifier.
0173:             * 
0174:             * @return Returns the controllerId.
0175:             */
0176:            public long getControllerId() {
0177:                return controllerId;
0178:            }
0179:
0180:            /**
0181:             * Sets the controller identifier value (this id must be unique). Parameter id
0182:             * must hold on 16 bits (&lt; 0xffff), otherwise an exception is thrown.
0183:             * Effective this.controllerId is <strong>not </strong> set to passed
0184:             * parameter id, but to id &lt;&lt; ControllerIdShiftBits. The reason for all
0185:             * this is that controllerIds are to be carried into ditributed transactions
0186:             * ids, in the upper 16 bits.
0187:             * 
0188:             * @param id The controllerId to set.
0189:             */
0190:            public void setControllerId(long id) {
0191:                if ((id & ~CONTROLLER_ID_BITS) != 0) {
0192:                    String msg = "Out of range controller id (" + id + ")";
0193:                    logger.error(msg);
0194:                    throw new RuntimeException(msg);
0195:                }
0196:                this .controllerId = (id << CONTROLLER_ID_SHIFT_BITS)
0197:                        & CONTROLLER_ID_BIT_MASK;
0198:                if (logger.isDebugEnabled())
0199:                    logger.debug("Setting controller identifier to " + id
0200:                            + " (shifted value is " + controllerId + ")");
0201:
0202:                scheduler.setControllerId(controllerId);
0203:            }
0204:
0205:            /**
0206:             * Make the given persistent connection id unique cluster-wide
0207:             * 
0208:             * @param id original id
0209:             * @return unique connection id
0210:             */
0211:            public long getNextConnectionId(long id) {
0212:                // 2 first bytes are used for controller id
0213:                // 6 right-most bytes are used for transaction id
0214:                id = id & TRANSACTION_ID_BIT_MASK;
0215:                id = id | controllerId;
0216:                return id;
0217:            }
0218:
0219:            /**
0220:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#getNextRequestId()
0221:             */
0222:            public long getNextRequestId() {
0223:                // We use the same bitmask as for transaction ids
0224:
0225:                long id = super .getNextRequestId();
0226:                // 2 first bytes are used for controller id
0227:                // 6 right-most bytes are used for transaction id
0228:                id = id & TRANSACTION_ID_BIT_MASK;
0229:                id = id | controllerId;
0230:                return id;
0231:            }
0232:
0233:            /**
0234:             * Get the trace logger of this DistributedRequestManager
0235:             * 
0236:             * @return a <code>Trace</code> object
0237:             */
0238:            public Trace getLogger() {
0239:                return logger;
0240:            }
0241:
0242:            /**
0243:             * Returns the vdb value.
0244:             * 
0245:             * @return Returns the vdb.
0246:             */
0247:            public VirtualDatabase getVirtualDatabase() {
0248:                return dvdb;
0249:            }
0250:
0251:            /**
0252:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setScheduler(org.continuent.sequoia.controller.scheduler.AbstractScheduler)
0253:             */
0254:            public void setScheduler(AbstractScheduler scheduler) {
0255:                super .setScheduler(scheduler);
0256:                // Note: don't try to use this.dvdb here: setScheduler is called by the
0257:                // c'tor, and dvdb is not set at this time.
0258:                if (vdb.getTotalOrderQueue() == null)
0259:                    throw new RuntimeException(
0260:                            "New scheduler does not support total ordering and is not compatible with distributed virtual databases.");
0261:            }
0262:
0263:            //
0264:            // Database Backends management
0265:            //
0266:
0267:            /**
0268:             * Enable a backend that has been previously added to this virtual database
0269:             * and that is in the disabled state. We check we the other controllers if
0270:             * this backend must be enabled in read-only or read-write. The current policy
0271:             * is that the first one to enable this backend will have read-write access to
0272:             * it and others will be in read-only.
0273:             * 
0274:             * @param db The database backend to enable
0275:             * @throws SQLException if an error occurs
0276:             */
0277:            public void enableBackend(DatabaseBackend db) throws SQLException {
0278:                int size = dvdb.getAllMemberButUs().size();
0279:                if (size > 0) {
0280:                    logger
0281:                            .debug(Translate
0282:                                    .get("virtualdatabase.distributed.enable.backend.check"));
0283:
0284:                    try {
0285:                        // Notify other controllers that we enable this backend.
0286:                        // No answer is expected.
0287:                        dvdb.getMulticastRequestAdapter().multicastMessage(
0288:                                dvdb.getAllMemberButUs(),
0289:                                new NotifyEnableBackend(new BackendInfo(db)),
0290:                                MulticastRequestAdapter.WAIT_NONE,
0291:                                dvdb.getMessageTimeouts()
0292:                                        .getEnableBackendTimeout());
0293:                    } catch (Exception e) {
0294:                        String msg = "Error while enabling backend "
0295:                                + db.getName();
0296:                        logger.error(msg, e);
0297:                        throw new SQLException(msg + "(" + e + ")");
0298:                    }
0299:                }
0300:
0301:                super .enableBackend(db);
0302:            }
0303:
0304:            /**
0305:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#disableBackend(org.continuent.sequoia.controller.backend.DatabaseBackend,
0306:             *      boolean)
0307:             */
0308:            public void disableBackend(DatabaseBackend db, boolean forceDisable)
0309:                    throws SQLException {
0310:                int size = dvdb.getAllMemberButUs().size();
0311:                if (size > 0) {
0312:                    logger.debug(Translate.get(
0313:                            "virtualdatabase.distributed.disable.backend", db
0314:                                    .getName()));
0315:
0316:                    try {
0317:                        // Notify other controllers that we disable this backend.
0318:                        // No answer is expected.
0319:                        dvdb.getMulticastRequestAdapter().multicastMessage(
0320:                                dvdb.getAllMemberButUs(),
0321:                                new NotifyDisableBackend(new BackendInfo(db)),
0322:                                MulticastRequestAdapter.WAIT_NONE,
0323:                                dvdb.getMessageTimeouts()
0324:                                        .getDisableBackendTimeout());
0325:                    } catch (Exception e) {
0326:                        String msg = "Error while disabling backend "
0327:                                + db.getName();
0328:                        logger.error(msg, e);
0329:                        throw new SQLException(msg + "(" + e + ")");
0330:                    }
0331:                }
0332:
0333:                super .disableBackend(db, forceDisable);
0334:            }
0335:
0336:            /**
0337:             * {@inheritDoc}
0338:             * 
0339:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#disableBackendsWithCheckpoint(java.util.ArrayList,
0340:             *      java.lang.String)
0341:             */
0342:            public void disableBackendsWithCheckpoint(ArrayList backendInfos,
0343:                    String checkpointName) throws SQLException {
0344:                // Perform the distributed call through the group-comm, in order to
0345:                // atomically disable the backend and store a cluster-wide checkpoint.
0346:                try {
0347:                    // Suspend transactions
0348:                    suspendActivity();
0349:
0350:                    dvdb.sendMessageToControllers(dvdb.getAllMembers(),
0351:                            new DisableBackendsAndSetCheckpoint(backendInfos,
0352:                                    checkpointName), dvdb.getMessageTimeouts()
0353:                                    .getDisableBackendTimeout());
0354:                } catch (Exception e) {
0355:                    String msg = "Error while disabling backends "
0356:                            + backendInfos;
0357:                    logger.error(msg, e);
0358:                    throw new SQLException(msg + "(" + e + ")");
0359:                } finally {
0360:                    resumeActivity();
0361:                }
0362:            }
0363:
0364:            //
0365:            // Failover management
0366:            //
0367:
0368:            private class FailureInformation {
0369:                private boolean needSchedulerNotification;
0370:                private long logId = -1;
0371:                private boolean success;
0372:                private boolean disableBackendOnSuccess;
0373:                private int updateCount;
0374:
0375:                /**
0376:                 * Creates a new <code>FailureInformation</code> object storing the
0377:                 * information about a failure that waits for a remote controller final
0378:                 * status.
0379:                 * 
0380:                 * @param needSchedulerNotification true if the scheduler must be notified
0381:                 * @param logId the recovery log id of the query
0382:                 */
0383:                public FailureInformation(boolean needSchedulerNotification,
0384:                        long logId) {
0385:                    this .needSchedulerNotification = needSchedulerNotification;
0386:                    this .logId = logId;
0387:                }
0388:
0389:                /**
0390:                 * Creates a new <code>FailureInformation</code> object storing the final
0391:                 * status from a remote controller. This version of the constructor is used
0392:                 * when the remote controller sends the final status before the local
0393:                 * controller has completed the request. This can happen in the case of a
0394:                 * timeout.
0395:                 * 
0396:                 * @param success indicates the result of the operation on the remote
0397:                 *          controller
0398:                 * @param disableBackendOnSuccess indicates whether or not the backend
0399:                 *          should be disabled.
0400:                 * @param updateCount the update count for the request
0401:                 */
0402:                public FailureInformation(boolean success,
0403:                        boolean disableBackendOnSuccess, int updateCount) {
0404:                    this .success = success;
0405:                    this .disableBackendOnSuccess = disableBackendOnSuccess;
0406:                    this .updateCount = updateCount;
0407:                }
0408:
0409:                /**
0410:                 * Returns the recovery log id value.
0411:                 * 
0412:                 * @return the recovery log id.
0413:                 */
0414:                public final long getLogId() {
0415:                    return logId;
0416:                }
0417:
0418:                /**
0419:                 * Sets the local logId for the request
0420:                 * 
0421:                 * @param logId the log id to set
0422:                 */
0423:                public void setLogId(long logId) {
0424:                    this .logId = logId;
0425:                }
0426:
0427:                /**
0428:                 * Returns true if scheduler notification is needed.
0429:                 * 
0430:                 * @return true if scheduler notification is needed.
0431:                 */
0432:                public final boolean needSchedulerNotification() {
0433:                    return needSchedulerNotification;
0434:                }
0435:
0436:                /**
0437:                 * sets the local scheduler notification indicator
0438:                 * 
0439:                 * @param needSchedulerNotification true if scheduler notification is
0440:                 *          needed.
0441:                 */
0442:                public void setNeedSchedulerNotification(
0443:                        boolean needSchedulerNotification) {
0444:                    this .needSchedulerNotification = needSchedulerNotification;
0445:                }
0446:
0447:                /**
0448:                 * Indicates whether or not the backend should be disabled.
0449:                 * 
0450:                 * @return true if backend must be disabled
0451:                 */
0452:                public boolean isDisableBackendOnSuccess() {
0453:                    return disableBackendOnSuccess;
0454:                }
0455:
0456:                /**
0457:                 * Indicates whether or not the query was successful on the remote
0458:                 * controller.
0459:                 * 
0460:                 * @return true if the query was successful on one of the remote
0461:                 *         controllers.
0462:                 */
0463:                public boolean isSuccess() {
0464:                    return success;
0465:                }
0466:
0467:                /**
0468:                 * Returns the update count (only meaningful if this was a request returning
0469:                 * an update count)
0470:                 * 
0471:                 * @return update count
0472:                 */
0473:                public int getUpdateCount() {
0474:                    return updateCount;
0475:                }
0476:
0477:            }
0478:
0479:            private void logRequestCompletionAndNotifyScheduler(
0480:                    AbstractRequest request, boolean success,
0481:                    FailureInformation failureInfo, int updateCount) {
0482:                // Update recovery log with completion information
0483:                if (recoveryLog != null) {
0484:                    boolean mustLog = !request.isReadOnly();
0485:                    if (request instanceof  StoredProcedure) {
0486:                        DatabaseProcedureSemantic semantic = ((StoredProcedure) request)
0487:                                .getSemantic();
0488:                        mustLog = (semantic == null) || semantic.isWrite();
0489:                    }
0490:                    if (mustLog && failureInfo.getLogId() != 0)
0491:                        recoveryLog.logRequestCompletion(
0492:                                failureInfo.getLogId(), success, request
0493:                                        .getExecTimeInMs(), updateCount);
0494:                }
0495:
0496:                if (failureInfo.needSchedulerNotification()) {
0497:                    try {
0498:                        // Notify scheduler now, the notification was postponed when
0499:                        // addFailedOnAllBackends was called.
0500:                        if (request instanceof  StoredProcedure)
0501:                            scheduler
0502:                                    .storedProcedureCompleted((StoredProcedure) request);
0503:                        else if (!request.isAutoCommit()
0504:                                && (request instanceof  UnknownWriteRequest)) {
0505:                            String sql = request.getSqlOrTemplate();
0506:                            TransactionMetaData tm = new TransactionMetaData(
0507:                                    request.getTransactionId(), 0, request
0508:                                            .getLogin(), request
0509:                                            .isPersistentConnection(), request
0510:                                            .getPersistentConnectionId());
0511:                            if ("commit".equals(sql))
0512:                                scheduler.commitCompleted(tm, success);
0513:                            else if ("rollback".equals(sql)
0514:                                    || "abort".equals(sql))
0515:                                scheduler.rollbackCompleted(tm, success);
0516:                            else if (sql.startsWith("rollback")) // rollback to savepoint
0517:                                scheduler.savepointCompleted(tm
0518:                                        .getTransactionId());
0519:                            else if (sql.startsWith("release "))
0520:                                scheduler.savepointCompleted(tm
0521:                                        .getTransactionId());
0522:                            else if (sql.startsWith("savepoint "))
0523:                                scheduler.savepointCompleted(tm
0524:                                        .getTransactionId());
0525:                            else
0526:                                // Real UnknownWriteRequest
0527:                                scheduler
0528:                                        .writeCompleted((AbstractWriteRequest) request);
0529:                        } else
0530:                            // Just an AbstractWriteRequest
0531:                            scheduler
0532:                                    .writeCompleted((AbstractWriteRequest) request);
0533:                    } catch (SQLException e) {
0534:                        logger.warn("Failed to notify scheduler for request "
0535:                                + request, e);
0536:                    }
0537:                }
0538:            }
0539:
0540:            /**
0541:             * Add a request that failed on all backends.
0542:             * 
0543:             * @param request the request that failed
0544:             * @param needSchedulerNotification true if the request has been scheduled but
0545:             *          the scheduler has not been notified yet of the request completion
0546:             * @see #completeFailedOnAllBackends(AbstractRequest, boolean, boolean)
0547:             */
0548:            public void addFailedOnAllBackends(AbstractRequest request,
0549:                    boolean needSchedulerNotification) {
0550:                synchronized (failedOnAllBackends) {
0551:                    /*
0552:                     * Failure information may already exist if the request was timed out at
0553:                     * the originating controller. In which case, we have all the information
0554:                     * required to complete the request.
0555:                     */
0556:                    FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
0557:                            .get(request);
0558:                    if (failureInfo == null)
0559:                        failedOnAllBackends.put(request,
0560:                                new FailureInformation(
0561:                                        needSchedulerNotification, request
0562:                                                .getLogId()));
0563:                    else {
0564:                        failureInfo.setLogId(request.getLogId());
0565:                        failureInfo
0566:                                .setNeedSchedulerNotification(needSchedulerNotification);
0567:                        completeFailedOnAllBackends(request, failureInfo
0568:                                .isSuccess(), failureInfo
0569:                                .isDisableBackendOnSuccess(),
0570:                                failureInfo.updateCount);
0571:                    }
0572:                }
0573:            }
0574:
0575:            /**
0576:             * Cleanup all queries registered as failed issued from the given controller.
0577:             * This is used when a controller has failed and no status information will be
0578:             * returned for failed queries. Queries are systematically tagged as failed.
0579:             * <p>
0580:             * FIXME: This is only correct with 2 controllers, in a 3+ controller
0581:             * scenario, we need to check from other controllers if someone succeeded.
0582:             * 
0583:             * @param failedControllerId id of the controller that has failed
0584:             */
0585:            public void cleanupAllFailedQueriesFromController(
0586:                    long failedControllerId) {
0587:                synchronized (failedOnAllBackends) {
0588:                    for (Iterator iter = failedOnAllBackends.keySet()
0589:                            .iterator(); iter.hasNext();) {
0590:                        AbstractRequest request = (AbstractRequest) iter.next();
0591:                        if (((request.getId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)
0592:                                || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)
0593:                                || (request.isPersistentConnection() && (request
0594:                                        .getPersistentConnectionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)) { // Need to remove that entry
0595:                            FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
0596:                                    .get(request);
0597:                            // failedOnAllBackends can contain completion status information for
0598:                            // requests that failed before we started processing processing
0599:                            // requests. These entries do not have a logId and should be ignored.
0600:                            if (failureInfo.getLogId() > 0) {
0601:                                if (logger.isInfoEnabled())
0602:                                    logger
0603:                                            .info("No status information received for request "
0604:                                                    + request
0605:                                                    + ", considering status as failed.");
0606:
0607:                                // If the current request is a rollback / abort, we need to call
0608:                                // logRequestCompletionAndNotifyScheduler with success set to true
0609:                                // for the transaction to be correctly cleaned up
0610:                                boolean isAbortOrRollback = (request instanceof  UnknownWriteRequest)
0611:                                        && ("rollback".equals(request
0612:                                                .getSqlOrTemplate()) || "abort"
0613:                                                .equals(request
0614:                                                        .getSqlOrTemplate()));
0615:
0616:                                logRequestCompletionAndNotifyScheduler(request,
0617:                                        isAbortOrRollback, failureInfo, -1);
0618:                            }
0619:                            iter.remove();
0620:                        }
0621:                    }
0622:                }
0623:            }
0624:
0625:            /**
0626:             * Notify completion of a request that either failed on all backends or was
0627:             * not executed at all (NoMoreBackendException). If completion was successful
0628:             * and query failed on all backends locally, all local backends are disabled
0629:             * if disabledBackendOnSuccess is true.
0630:             * 
0631:             * @param request request that completed
0632:             * @param success true if completion is successful
0633:             * @param disableBackendOnSuccess disable all local backends if query was
0634:             *          successful (but failed locally)
0635:             * @param updateCount request update count to be logged in recovery log
0636:             * @see #addFailedOnAllBackends(AbstractRequest, boolean)
0637:             */
0638:            public void completeFailedOnAllBackends(AbstractRequest request,
0639:                    boolean success, boolean disableBackendOnSuccess,
0640:                    int updateCount) {
0641:                FailureInformation failureInfo;
0642:                synchronized (failedOnAllBackends) {
0643:                    failureInfo = (FailureInformation) failedOnAllBackends
0644:                            .remove(request);
0645:                    if (failureInfo == null) {
0646:                        /*
0647:                         * If we can't find failureInformation, assume the remote controller
0648:                         * failed the request before it completed locally. This is probably due
0649:                         * to a timeout.
0650:                         */
0651:                        failureInfo = new FailureInformation(success,
0652:                                disableBackendOnSuccess, updateCount);
0653:                        failedOnAllBackends.put(request, failureInfo);
0654:
0655:                        logger
0656:                                .info("Unable to find request "
0657:                                        + request.getSqlShortForm(dvdb
0658:                                                .getSqlShortFormLength())
0659:                                        + " in list of requests that failed on all backends.");
0660:                        return;
0661:                    }
0662:                }
0663:                logRequestCompletionAndNotifyScheduler(request, success,
0664:                        failureInfo, updateCount);
0665:
0666:                if (disableBackendOnSuccess && success) {
0667:                    // Now really disable the backends
0668:                    String message = "Request "
0669:                            + request.getSqlShortForm(dvdb
0670:                                    .getSqlShortFormLength())
0671:                            + " failed on all local backends but succeeded on other controllers. Disabling all local backends.";
0672:                    logger.error(message);
0673:                    endUserLogger.error(message);
0674:
0675:                    try {
0676:                        dvdb.disableAllBackends(true);
0677:                    } catch (VirtualDatabaseException e) {
0678:                        logger
0679:                                .error(
0680:                                        "An error occured while disabling all backends",
0681:                                        e);
0682:                    }
0683:                }
0684:            }
0685:
0686:            /**
0687:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#closePersistentConnection(java.lang.String,
0688:             *      long)
0689:             */
0690:            public void distributedClosePersistentConnection(String login,
0691:                    long persistentConnectionId) {
0692:                List groupMembers = dvdb.getCurrentGroup().getMembers();
0693:
0694:                if (logger.isDebugEnabled())
0695:                    logger.debug("Broadcasting closing persistent connection "
0696:                            + persistentConnectionId + " for user " + login
0697:                            + " to all controllers ("
0698:                            + dvdb.getChannel().getLocalMembership() + "->"
0699:                            + groupMembers.toString() + ")");
0700:
0701:                // Send the query to everybody including us
0702:                try {
0703:                    dvdb.getMulticastRequestAdapter().multicastMessage(
0704:                            groupMembers,
0705:                            new DistributedClosePersistentConnection(login,
0706:                                    persistentConnectionId),
0707:                            MulticastRequestAdapter.WAIT_ALL, 0);
0708:                } catch (Exception e) {
0709:                    String msg = "An error occured while executing distributed persistent connection "
0710:                            + persistentConnectionId + " closing";
0711:                    logger.warn(msg, e);
0712:                }
0713:
0714:                if (logger.isDebugEnabled())
0715:                    logger.debug("Persistent connection "
0716:                            + persistentConnectionId + " closed.");
0717:            }
0718:
0719:            /**
0720:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#openPersistentConnection(String,
0721:             *      long)
0722:             */
0723:            public void distributedOpenPersistentConnection(String login,
0724:                    long persistentConnectionId) throws SQLException {
0725:                List groupMembers = dvdb.getCurrentGroup().getMembers();
0726:
0727:                if (logger.isDebugEnabled())
0728:                    logger.debug("Broadcasting opening persistent connection "
0729:                            + persistentConnectionId + " for user " + login
0730:                            + " to all controllers ("
0731:                            + dvdb.getChannel().getLocalMembership() + "->"
0732:                            + groupMembers.toString() + ")");
0733:
0734:                boolean success = false;
0735:                Exception exception = null;
0736:                try {
0737:                    // Send the query to everybody including us
0738:                    MulticastResponse responses = dvdb
0739:                            .getMulticastRequestAdapter().multicastMessage(
0740:                                    groupMembers,
0741:                                    new DistributedOpenPersistentConnection(
0742:                                            login, persistentConnectionId),
0743:                                    MulticastRequestAdapter.WAIT_ALL, 0);
0744:
0745:                    // get a list that won't change while we go through it
0746:                    groupMembers = dvdb.getAllMembers();
0747:                    int size = groupMembers.size();
0748:                    ArrayList failedControllers = null;
0749:                    // Get the result of each controller
0750:                    for (int i = 0; i < size; i++) {
0751:                        Member member = (Member) groupMembers.get(i);
0752:                        if ((responses.getFailedMembers() != null)
0753:                                && responses.getFailedMembers()
0754:                                        .contains(member)) {
0755:                            logger.warn("Controller " + member
0756:                                    + " is suspected of failure.");
0757:                            continue;
0758:                        }
0759:                        Object r = responses.getResult(member);
0760:                        if (r instanceof  Boolean) {
0761:                            if (((Boolean) r).booleanValue())
0762:                                success = true;
0763:                            else
0764:                                logger
0765:                                        .error("Unexpected result for controller  "
0766:                                                + member);
0767:                        } else if (r instanceof  Exception) {
0768:                            if (failedControllers == null)
0769:                                failedControllers = new ArrayList();
0770:                            failedControllers.add(member);
0771:                            if (exception == null)
0772:                                exception = (Exception) r;
0773:                            if (logger.isDebugEnabled())
0774:                                logger
0775:                                        .debug("Controller "
0776:                                                + member
0777:                                                + " failed to open persistent connection  "
0778:                                                + persistentConnectionId + " ("
0779:                                                + r + ")");
0780:                        }
0781:                    }
0782:
0783:                    /*
0784:                     * Notify all controllers where all backend failed (if any) that
0785:                     * completion was 'success'.
0786:                     */
0787:                    if (failedControllers != null) {
0788:                        UnknownWriteRequest notifRequest = new UnknownWriteRequest(
0789:                                "open " + persistentConnectionId, false, 0,
0790:                                null);
0791:                        notifyRequestCompletion(notifRequest, success, false,
0792:                                failedControllers);
0793:                    }
0794:                } catch (Exception e) {
0795:                    String msg = "An error occured while executing distributed persistent connection "
0796:                            + persistentConnectionId + " opening";
0797:                    logger.warn(msg, e);
0798:                }
0799:
0800:                if (success) {
0801:                    if (logger.isDebugEnabled())
0802:                        logger.debug("Persistent connection "
0803:                                + persistentConnectionId + " opened.");
0804:                    return; // This is a success if at least one controller has succeeded
0805:                }
0806:
0807:                // At this point, all controllers failed
0808:                String msg = "Failed to open persistent connection "
0809:                        + persistentConnectionId + " on all controllers ("
0810:                        + exception + ")";
0811:                logger.warn(msg);
0812:                throw new SQLException(msg);
0813:            }
0814:
0815:            /**
0816:             * Notify to all members the failover for the specified persistent connection.
0817:             * 
0818:             * @param persistentConnectionId persistent connection id
0819:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForPersistentConnection(long)
0820:             */
0821:            public void distributedFailoverForPersistentConnection(
0822:                    long persistentConnectionId) {
0823:                List groupMembers = dvdb.getCurrentGroup().getMembers();
0824:
0825:                if (logger.isDebugEnabled())
0826:                    logger
0827:                            .debug("Broadcasting failover for persistent connection "
0828:                                    + persistentConnectionId
0829:                                    + " to all controllers ("
0830:                                    + dvdb.getChannel().getLocalMembership()
0831:                                    + "->" + groupMembers.toString() + ")");
0832:
0833:                // Send the query to everybody including us
0834:                try {
0835:                    dvdb.getMulticastRequestAdapter().multicastMessage(
0836:                            groupMembers,
0837:                            new FailoverForPersistentConnection(
0838:                                    persistentConnectionId),
0839:                            MulticastRequestAdapter.WAIT_ALL, 0);
0840:                } catch (Exception e) {
0841:                    String msg = "An error occured while notifying distributed persistent connection "
0842:                            + persistentConnectionId + " failover";
0843:                    logger.warn(msg, e);
0844:                }
0845:            }
0846:
0847:            /**
0848:             * Notify to all members the failover for the specified transaction.
0849:             * 
0850:             * @param currentTid transaction id
0851:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForTransaction(long)
0852:             */
0853:            public void distributedFailoverForTransaction(long currentTid) {
0854:                List groupMembers = dvdb.getCurrentGroup().getMembers();
0855:
0856:                if (logger.isDebugEnabled())
0857:                    logger.debug("Broadcasting failover for transaction "
0858:                            + currentTid + " to all controllers ("
0859:                            + dvdb.getChannel().getLocalMembership() + "->"
0860:                            + groupMembers.toString() + ")");
0861:
0862:                // Send the query to everybody including us
0863:                try {
0864:                    dvdb.getMulticastRequestAdapter().multicastMessage(
0865:                            groupMembers,
0866:                            new FailoverForTransaction(currentTid),
0867:                            MulticastRequestAdapter.WAIT_ALL, 0);
0868:                } catch (Exception e) {
0869:                    String msg = "An error occured while notifying distributed persistent connection "
0870:                            + currentTid + " failover";
0871:                    logger.warn(msg, e);
0872:                }
0873:            }
0874:
0875:            //
0876:            // Transaction management
0877:            //
0878:
0879:            /**
0880:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#abort(long,
0881:             *      boolean, boolean)
0882:             */
0883:            public void abort(long transactionId, boolean logAbort,
0884:                    boolean forceAbort) throws SQLException {
0885:                Long lTid = new Long(transactionId);
0886:                TransactionMetaData tm;
0887:                try {
0888:                    tm = getTransactionMetaData(lTid);
0889:                    if (!forceAbort && tidSavepoints.get(lTid) != null) {
0890:                        if (logger.isDebugEnabled())
0891:                            logger
0892:                                    .debug("Transaction "
0893:                                            + transactionId
0894:                                            + " has savepoints, transaction will not be aborted");
0895:                        return;
0896:                    }
0897:                } catch (SQLException e1) {
0898:                    logger
0899:                            .warn("No transaction metadata found to abort transaction "
0900:                                    + transactionId
0901:                                    + ". Creating a fake context for abort.");
0902:                    // We ignore the persistent connection id here (retrieved by connection
0903:                    // manager)
0904:                    tm = new TransactionMetaData(transactionId, 0,
0905:                            RecoveryLog.UNKNOWN_USER, false, 0);
0906:                    if (tidSavepoints.get(lTid) != null) {
0907:                        if (logger.isDebugEnabled())
0908:                            logger
0909:                                    .debug("Transaction "
0910:                                            + transactionId
0911:                                            + " has savepoints, transaction will not be aborted");
0912:                        return;
0913:                    }
0914:                }
0915:
0916:                boolean isAWriteTransaction;
0917:                synchronized (distributedTransactions) {
0918:                    isAWriteTransaction = distributedTransactions
0919:                            .contains(lTid);
0920:                }
0921:                if (isAWriteTransaction) {
0922:                    distributedAbort(tm.getLogin(), transactionId);
0923:                } else {
0924:                    // read-only transaction, it is local but we still have to post the query
0925:                    // in the total order queue. Note that we post a Rollback object because
0926:                    // the load balancer will treat the abort as a rollback.
0927:                    LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
0928:                    synchronized (totalOrderQueue) {
0929:                        totalOrderQueue.addLast(new DistributedRollback(tm
0930:                                .getLogin(), transactionId));
0931:                    }
0932:                    super .abort(transactionId, logAbort, forceAbort);
0933:                }
0934:            }
0935:
0936:            /**
0937:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#begin(String,
0938:             *      boolean, long) overrides RequestManager.begin(String) to apply bit
0939:             *      masks to the tid returned by the scheduler
0940:             */
0941:            public long begin(String login, boolean isPersistentConnection,
0942:                    long persistentConnectionId) throws SQLException {
0943:                long tid = scheduler.getNextTransactionId();
0944:                // 2 first bytes are used for controller id
0945:                // 6 right-most bytes are used for transaction id
0946:                tid = tid & TRANSACTION_ID_BIT_MASK;
0947:                tid = tid | controllerId;
0948:                doBegin(login, tid, isPersistentConnection,
0949:                        persistentConnectionId);
0950:                return tid;
0951:            }
0952:
0953:            /**
0954:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#commit(long,
0955:             *      boolean, boolean)
0956:             */
0957:            public void commit(long transactionId, boolean logCommit,
0958:                    boolean emptyTransaction) throws SQLException {
0959:                Long lTid = new Long(transactionId);
0960:                TransactionMetaData tm = getTransactionMetaData(lTid);
0961:                boolean isAWriteTransaction;
0962:                synchronized (distributedTransactions) {
0963:                    isAWriteTransaction = distributedTransactions
0964:                            .contains(lTid);
0965:                }
0966:                if (isAWriteTransaction) {
0967:                    distributedCommit(tm.getLogin(), transactionId);
0968:                } else {
0969:                    // read-only transaction, it is local
0970:                    DistributedCommit commit = new DistributedCommit(tm
0971:                            .getLogin(), transactionId);
0972:                    if (!emptyTransaction) {
0973:                        LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
0974:                        synchronized (totalOrderQueue) {
0975:                            totalOrderQueue.addLast(commit);
0976:                        }
0977:                    }
0978:                    try {
0979:                        super 
0980:                                .commit(transactionId, logCommit,
0981:                                        emptyTransaction);
0982:                    } catch (SQLException e) {
0983:                        if (logger.isWarnEnabled()) {
0984:                            logger
0985:                                    .warn("Ignoring failure of commit for read-only transaction, exception was: "
0986:                                            + e);
0987:                        }
0988:
0989:                        // Force transaction completion on scheduler
0990:                        scheduler.commit(tm, emptyTransaction, commit);
0991:                        scheduler.commitCompleted(tm, true);
0992:
0993:                        // Clean-up transactional context
0994:                        completeTransaction(lTid);
0995:                    }
0996:                }
0997:            }
0998:
0999:            /**
1000:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#completeTransaction(java.lang.Long)
1001:             */
1002:            public void completeTransaction(Long tid) {
1003:                synchronized (distributedTransactions) {
1004:                    distributedTransactions.remove(tid);
1005:                }
1006:                super .completeTransaction(tid);
1007:            }
1008:
1009:            /**
1010:             * Check if the transaction corresponding to the given query has been started
1011:             * remotely and start the transaction locally in a lazy manner if needed. This
1012:             * also checks if a local request must trigger the logging of a 'begin' in the
1013:             * recovery log.
1014:             * 
1015:             * @param request query to execute
1016:             * @throws SQLException if an error occurs
1017:             */
1018:            public void lazyTransactionStart(AbstractRequest request)
1019:                    throws SQLException {
1020:                // Check if this is a remotely started transaction that we need to lazily
1021:                // start locally. Note that we cannot decide from its id that a transaction
1022:                // has been started remotely. In a failover case the client still uses the
1023:                // transaction id given by the original controller. See SEQUOIA-930.
1024:                if (!request.isAutoCommit()) {
1025:                    long tid = request.getTransactionId();
1026:                    Long lTid = new Long(tid);
1027:                    TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1028:                            .get(lTid);
1029:
1030:                    // Check if transaction is started
1031:                    if (tm != null) {
1032:                        /*
1033:                         * It may have been started by a failover before any writes were
1034:                         * executed.
1035:                         */
1036:                        if (tm.isReadOnly()) {
1037:                            request.setIsLazyTransactionStart(true);
1038:                            tm.setReadOnly(false);
1039:                        }
1040:                        return; // transaction already started
1041:                    }
1042:                    // Begin this transaction
1043:                    try {
1044:                        tm = new TransactionMetaData(tid, beginTimeout, request
1045:                                .getLogin(), request.isPersistentConnection(),
1046:                                request.getPersistentConnectionId());
1047:                        tm.setReadOnly(false);
1048:
1049:                        if (logger.isDebugEnabled())
1050:                            logger.debug(Translate.get(
1051:                                    "transaction.begin.lazy", String
1052:                                            .valueOf(tid)));
1053:
1054:                        scheduler.begin(tm, true, request);
1055:
1056:                        try {
1057:                            // Send to load balancer
1058:                            loadBalancer.begin(tm);
1059:
1060:                            // We need to update the tid table first so that
1061:                            // logLazyTransactionBegin can retrieve the metadata
1062:                            transactionMetaDatas.put(lTid, tm);
1063:                            request.setIsLazyTransactionStart(true);
1064:
1065:                            synchronized (distributedTransactions) {
1066:                                if (!distributedTransactions.contains(lTid))
1067:                                    distributedTransactions.add(lTid);
1068:                            }
1069:                        } catch (SQLException e) {
1070:                            if (recoveryLog != null)
1071:                                // In case logLazyTransactionBegin failed
1072:                                transactionMetaDatas.remove(lTid);
1073:                            throw e;
1074:                        } finally {
1075:                            // Notify scheduler for completion in any case
1076:                            scheduler.beginCompleted(tid);
1077:                        }
1078:                    } catch (RuntimeException e) {
1079:                        String msg = Translate
1080:                                .get("fatal.runtime.exception.requestmanager.begin");
1081:                        logger.fatal(msg, e);
1082:                        endUserLogger.fatal(msg);
1083:                        throw new SQLException(e.getMessage());
1084:                    }
1085:                }
1086:            }
1087:
1088:            /**
1089:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#rollback(long,
1090:             *      boolean)
1091:             */
1092:            public void rollback(long transactionId, boolean logRollback)
1093:                    throws SQLException {
1094:                Long lTid = new Long(transactionId);
1095:                TransactionMetaData tm = getTransactionMetaData(lTid);
1096:                boolean isAWriteTransaction;
1097:                synchronized (distributedTransactions) {
1098:                    isAWriteTransaction = distributedTransactions
1099:                            .contains(lTid);
1100:                }
1101:                if (isAWriteTransaction) {
1102:                    distributedRollback(tm.getLogin(), transactionId);
1103:                } else {
1104:                    // read-only transaction, it is local
1105:                    DistributedRollback rollback = new DistributedRollback(tm
1106:                            .getLogin(), transactionId);
1107:                    LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
1108:                    synchronized (totalOrderQueue) {
1109:                        totalOrderQueue.addLast(rollback);
1110:                    }
1111:                    try {
1112:                        super .rollback(transactionId, logRollback);
1113:                    } catch (SQLException e) {
1114:                        if (logger.isWarnEnabled()) {
1115:                            logger
1116:                                    .warn("Ignoring failure of rollback for read-only transaction, exception was: "
1117:                                            + e);
1118:                        }
1119:
1120:                        // Force transaction completion on scheduler
1121:                        try {
1122:                            scheduler.rollback(tm, rollback);
1123:                        } catch (SQLException ignore) {
1124:                        }
1125:                        scheduler.rollbackCompleted(tm, true);
1126:
1127:                        // Clean-up transactional context
1128:                        completeTransaction(lTid);
1129:                    }
1130:                }
1131:            }
1132:
1133:            /**
1134:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#rollback(long,
1135:             *      String)
1136:             */
1137:            public void rollback(long transactionId, String savepointName)
1138:                    throws SQLException {
1139:                Long lTid = new Long(transactionId);
1140:                boolean isAWriteTransaction;
1141:                synchronized (distributedTransactions) {
1142:                    isAWriteTransaction = distributedTransactions
1143:                            .contains(lTid);
1144:                }
1145:                if (isAWriteTransaction) {
1146:                    TransactionMetaData tm = getTransactionMetaData(lTid);
1147:                    distributedRollback(tm.getLogin(), transactionId,
1148:                            savepointName);
1149:                } else { // read-only transaction, it is local
1150:                    LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
1151:                    synchronized (totalOrderQueue) {
1152:                        totalOrderQueue
1153:                                .addLast(new DistributedRollbackToSavepoint(
1154:                                        transactionId, savepointName));
1155:                    }
1156:                    super .rollback(transactionId, savepointName);
1157:                }
1158:            }
1159:
1160:            /**
1161:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setSavepoint(long)
1162:             */
1163:            public int setSavepoint(long transactionId) throws SQLException {
1164:                Long lTid = new Long(transactionId);
1165:                int savepointId = scheduler.incrementSavepointId();
1166:                TransactionMetaData tm = getTransactionMetaData(lTid);
1167:                synchronized (distributedTransactions) {
1168:                    if (!distributedTransactions.contains(lTid))
1169:                        distributedTransactions.add(lTid);
1170:                }
1171:                distributedSetSavepoint(tm.getLogin(), transactionId, String
1172:                        .valueOf(savepointId));
1173:                return savepointId;
1174:            }
1175:
1176:            /**
1177:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setSavepoint(long,
1178:             *      String)
1179:             */
1180:            public void setSavepoint(long transactionId, String name)
1181:                    throws SQLException {
1182:                Long lTid = new Long(transactionId);
1183:                TransactionMetaData tm = getTransactionMetaData(lTid);
1184:                synchronized (distributedTransactions) {
1185:                    if (!distributedTransactions.contains(lTid))
1186:                        distributedTransactions.add(lTid);
1187:                }
1188:                distributedSetSavepoint(tm.getLogin(), transactionId, name);
1189:            }
1190:
1191:            /**
1192:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#releaseSavepoint(long,
1193:             *      String)
1194:             */
1195:            public void releaseSavepoint(long transactionId, String name)
1196:                    throws SQLException {
1197:                Long lTid = new Long(transactionId);
1198:                boolean isAWriteTransaction;
1199:                synchronized (distributedTransactions) {
1200:                    isAWriteTransaction = distributedTransactions
1201:                            .contains(lTid);
1202:                }
1203:                if (isAWriteTransaction) {
1204:                    TransactionMetaData tm = getTransactionMetaData(lTid);
1205:                    distributedReleaseSavepoint(tm.getLogin(), transactionId,
1206:                            name);
1207:                } else {
1208:                    // read-only transaction, it is local
1209:                    LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
1210:                    synchronized (totalOrderQueue) {
1211:                        totalOrderQueue
1212:                                .addLast(new DistributedReleaseSavepoint(
1213:                                        transactionId, name));
1214:                    }
1215:                    super .releaseSavepoint(transactionId, name);
1216:                }
1217:            }
1218:
1219:            /**
1220:             * Add this transaction to the list of write transactions that needs to be
1221:             * globally commited. This happens if the transaction has only be started
1222:             * locally but not through a lazy start.
1223:             */
1224:            private void addToDistributedTransactionListIfNeeded(
1225:                    AbstractRequest request) {
1226:                // Add to distributed transactions if needed
1227:                if (!request.isAutoCommit()) {
1228:                    Long lTid = new Long(request.getTransactionId());
1229:                    synchronized (distributedTransactions) {
1230:                        if (!distributedTransactions.contains(lTid))
1231:                            distributedTransactions.add(lTid);
1232:                    }
1233:                }
1234:            }
1235:
1236:            /**
1237:             * Retrieve the vLogin corresponding to the persistent connection id provided
1238:             * and close the connection if found. This is used by the
1239:             * ControllerFailureCleanupThread to cleanup reamining persistent connections
1240:             * from a failed controller whose clients never recovered.
1241:             * 
1242:             * @param connectionId the persistent connection id
1243:             */
1244:            public void closePersistentConnection(Long connectionId) {
1245:                String vLogin = scheduler
1246:                        .getPersistentConnectionLogin(connectionId);
1247:                if (vLogin != null)
1248:                    super .closePersistentConnection(vLogin, connectionId
1249:                            .longValue());
1250:            }
1251:
1252:            /**
1253:             * Performs a local read operation, as opposed to execReadRequest() which
1254:             * attempts to use distributed reads when there is NoMoreBackendException.
1255:             * 
1256:             * @param request the read request to perform
1257:             * @return a ControllerResultSet
1258:             * @throws NoMoreBackendException when no more local backends are available to
1259:             *           execute the request
1260:             * @throws SQLException in case of error
1261:             */
1262:            public ControllerResultSet execLocalStatementExecuteQuery(
1263:                    SelectRequest request) throws NoMoreBackendException,
1264:                    SQLException {
1265:                return super .statementExecuteQuery(request);
1266:            }
1267:
1268:            /**
1269:             * Execute a read request on some remote controller - one in the group. Used
1270:             * when the local controller has no backend available to execute the request.
1271:             * 
1272:             * @param request the request to execute
1273:             * @return the query ResultSet
1274:             * @throws SQLException in case of bad request
1275:             */
1276:            public abstract ControllerResultSet execRemoteStatementExecuteQuery(
1277:                    SelectRequest request) throws SQLException;
1278:
1279:            /**
1280:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
1281:             */
1282:            public ControllerResultSet statementExecuteQuery(
1283:                    SelectRequest request) throws SQLException {
1284:                if (!request.isMustBroadcast()) {
1285:                    try {
1286:                        return execLocalStatementExecuteQuery(request);
1287:                    } catch (SQLException e) {
1288:                        if (!(e instanceof  NoMoreBackendException))
1289:                            throw e;
1290:                        // else this failed locally, try it remotely
1291:                        // Request failed locally, try on other controllers.
1292:                        addToDistributedTransactionListIfNeeded(request);
1293:                        return execRemoteStatementExecuteQuery(request);
1294:                    }
1295:                }
1296:                addToDistributedTransactionListIfNeeded(request);
1297:                return distributedStatementExecuteQuery(request);
1298:            }
1299:
1300:            /**
1301:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteUpdate(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1302:             */
1303:            public ExecuteUpdateResult statementExecuteUpdate(
1304:                    AbstractWriteRequest request) throws SQLException {
1305:                if (!request.isAutoCommit()) { /*
1306:                 * Add this transaction to the list of write transactions that needs to be
1307:                 * globally commited. This happens if the transaction has only be started
1308:                 * locally but not through a lazy start.
1309:                 */
1310:                    Long lTid = new Long(request.getTransactionId());
1311:                    synchronized (distributedTransactions) {
1312:                        if (!distributedTransactions.contains(lTid))
1313:                            distributedTransactions.add(lTid);
1314:                    }
1315:                }
1316:                return distributedStatementExecuteUpdate(request);
1317:            }
1318:
1319:            /**
1320:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteUpdateWithKeys(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1321:             */
1322:            public GeneratedKeysResult statementExecuteUpdateWithKeys(
1323:                    AbstractWriteRequest request) throws SQLException {
1324:                if (!request.isAutoCommit()) { /*
1325:                 * Add this transaction to the list of write transactions that needs to be
1326:                 * globally commited. This happens if the transaction has only be started
1327:                 * locally but not through a lazy start.
1328:                 */
1329:                    Long lTid = new Long(request.getTransactionId());
1330:                    synchronized (distributedTransactions) {
1331:                        if (!distributedTransactions.contains(lTid))
1332:                            distributedTransactions.add(lTid);
1333:                    }
1334:                }
1335:                return distributedStatementExecuteUpdateWithKeys(request);
1336:            }
1337:
1338:            /**
1339:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecute(AbstractRequest)
1340:             */
1341:            public ExecuteResult statementExecute(AbstractRequest request)
1342:                    throws SQLException {
1343:                if (!request.isAutoCommit()) { /*
1344:                 * Add this transaction to the list of write transactions that needs to be
1345:                 * globally commited. This happens if the transaction has only be started
1346:                 * locally but not through a lazy start.
1347:                 */
1348:                    Long lTid = new Long(request.getTransactionId());
1349:                    synchronized (distributedTransactions) {
1350:                        if (!distributedTransactions.contains(lTid))
1351:                            distributedTransactions.add(lTid);
1352:                    }
1353:                }
1354:                return distributedStatementExecute(request);
1355:            }
1356:
1357:            /**
1358:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#scheduleExecWriteRequest(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1359:             */
1360:            public void scheduleExecWriteRequest(AbstractWriteRequest request)
1361:                    throws SQLException {
1362:                lazyTransactionStart(request);
1363:                super .scheduleExecWriteRequest(request);
1364:            }
1365:
1366:            /**
1367:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecuteQuery(StoredProcedure)
1368:             */
1369:            public ControllerResultSet callableStatementExecuteQuery(
1370:                    StoredProcedure proc) throws SQLException {
1371:                // Parse the query first to update the semantic information
1372:                getParsingFromCacheOrParse(proc);
1373:
1374:                // If procedure is read-only, we don't broadcast
1375:                DatabaseProcedureSemantic semantic = proc.getSemantic();
1376:                if (proc.isReadOnly()
1377:                        || ((semantic != null) && (semantic.isReadOnly()))) {
1378:                    try {
1379:                        proc.setIsReadOnly(true);
1380:                        return execLocallyCallableStatementExecuteQuery(proc);
1381:                    } catch (AllBackendsFailedException ignore) {
1382:                        // This failed locally, try it remotely
1383:                    } catch (SQLException e) {
1384:                        if (!(e instanceof  NoMoreBackendException))
1385:                            throw e;
1386:                        // else this failed locally, try it remotely
1387:                    }
1388:                }
1389:
1390:                addToDistributedTransactionListIfNeeded(proc);
1391:                return distributedCallableStatementExecuteQuery(proc);
1392:            }
1393:
1394:            /**
1395:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
1396:             */
1397:            public ExecuteUpdateResult callableStatementExecuteUpdate(
1398:                    StoredProcedure proc) throws SQLException {
1399:                if (!proc.isAutoCommit()) { /*
1400:                 * Add this transaction to the list of write transactions that needs to be
1401:                 * globally commited. This happens if the transaction has only be started
1402:                 * locally but not through a lazy start.
1403:                 */
1404:                    Long lTid = new Long(proc.getTransactionId());
1405:                    synchronized (distributedTransactions) {
1406:                        if (!distributedTransactions.contains(lTid))
1407:                            distributedTransactions.add(lTid);
1408:                    }
1409:                }
1410:                return distributedCallableStatementExecuteUpdate(proc);
1411:            }
1412:
1413:            /**
1414:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecute(StoredProcedure)
1415:             */
1416:            public ExecuteResult callableStatementExecute(StoredProcedure proc)
1417:                    throws SQLException {
1418:                // Parse the query first to update the semantic information
1419:                getParsingFromCacheOrParse(proc);
1420:
1421:                // If procedure is read-only, we don't broadcast
1422:                DatabaseProcedureSemantic semantic = proc.getSemantic();
1423:                if (proc.isReadOnly()
1424:                        || ((semantic != null) && (semantic.isReadOnly()))) {
1425:                    try {
1426:                        proc.setIsReadOnly(true);
1427:                        return execLocallyCallableStatementExecute(proc);
1428:                    } catch (AllBackendsFailedException ignore) {
1429:                        // This failed locally, try it remotely
1430:                    } catch (SQLException e) {
1431:                        if (!(e instanceof  NoMoreBackendException))
1432:                            throw e;
1433:                        // else this failed locally, try it remotely
1434:                    }
1435:                }
1436:
1437:                if (!proc.isAutoCommit()) { /*
1438:                 * Add this transaction to the list of write transactions that needs to be
1439:                 * globally commited. This happens if the transaction has only be started
1440:                 * locally but not through a lazy start.
1441:                 */
1442:                    Long lTid = new Long(proc.getTransactionId());
1443:                    synchronized (distributedTransactions) {
1444:                        if (!distributedTransactions.contains(lTid))
1445:                            distributedTransactions.add(lTid);
1446:                    }
1447:                }
1448:                return distributedCallableStatementExecute(proc);
1449:            }
1450:
1451:            /**
1452:             * Fetch the result of a previously executed request from a remote controller
1453:             * failover cache.
1454:             * 
1455:             * @param successfulControllers controllers to fetch the result from
1456:             * @param id unique identifier of the query to look the result for
1457:             * @return the request result
1458:             * @throws NoResultAvailableException if no result could be retrieved from the
1459:             *           failover cache
1460:             */
1461:            protected Serializable getRequestResultFromFailoverCache(
1462:                    List successfulControllers, long id)
1463:                    throws NoResultAvailableException {
1464:                List groupMembers = new ArrayList(1);
1465:
1466:                // Try all members in turn and return as soon as one succeeds
1467:                for (Iterator iter = successfulControllers.iterator(); iter
1468:                        .hasNext();) {
1469:                    Member remoteController = (Member) iter.next();
1470:                    groupMembers.clear();
1471:                    groupMembers.add(remoteController);
1472:
1473:                    if (logger.isDebugEnabled())
1474:                        logger.debug("Getting result for request " + id
1475:                                + " from controllers " + remoteController);
1476:
1477:                    try { // Send the request to that controller
1478:                        MulticastResponse response = dvdb
1479:                                .getMulticastRequestAdapter().multicastMessage(
1480:                                        groupMembers,
1481:                                        new GetRequestResultFromFailoverCache(
1482:                                                id),
1483:                                        MulticastRequestAdapter.WAIT_ALL, 0);
1484:                        Serializable result = response
1485:                                .getResult(remoteController);
1486:
1487:                        if ((result instanceof  Exception)
1488:                                || (response.getFailedMembers() != null)) { // Failure on the remote controller
1489:                            if (logger.isInfoEnabled())
1490:                                logger
1491:                                        .info(
1492:                                                "Controller "
1493:                                                        + remoteController
1494:                                                        + " could not fetch result for request "
1495:                                                        + id,
1496:                                                (Exception) result);
1497:                        } else
1498:                            return result;
1499:                    } catch (Exception e) {
1500:                        String msg = "An error occured while getching result for request "
1501:                                + id + " from controller " + remoteController;
1502:                        logger.warn(msg, e);
1503:                    }
1504:                }
1505:                throw new NoResultAvailableException(
1506:                        "All controllers failed when trying to fetch result for request "
1507:                                + id);
1508:            }
1509:
1510:            /**
1511:             * Stores a result associated with a request in the request result failover
1512:             * cache.
1513:             * <p>
1514:             * Only results for requests initiated on a remote controller are stored.
1515:             * 
1516:             * @param request the request executed
1517:             * @param result the result of the request
1518:             * @return true if the result was added to the cache, false if the request was
1519:             *         local to this controller
1520:             * @see org.continuent.sequoia.controller.virtualdatabase.RequestResultFailoverCache#store(AbstractRequest,
1521:             *      Serializable)
1522:             */
1523:            public boolean storeRequestResult(AbstractRequest request,
1524:                    Serializable result) {
1525:                // Cache only results for requests initiated by a remote controller.
1526:                if ((request.getId() & CONTROLLER_ID_BIT_MASK) != dvdb
1527:                        .getControllerId()) {
1528:                    dvdb.getRequestResultFailoverCache().store(request, result);
1529:                    return true;
1530:                }
1531:                return false;
1532:            }
1533:
1534:            //
1535:            // RAIDb level specific methods
1536:            //
1537:
1538:            /**
1539:             * Distributed implementation of an abort
1540:             * 
1541:             * @param login login that abort the transaction
1542:             * @param transactionId id of the commiting transaction
1543:             * @throws SQLException if an error occurs
1544:             */
1545:            public abstract void distributedAbort(String login,
1546:                    long transactionId) throws SQLException;
1547:
1548:            /**
1549:             * Distributed implementation of a commit
1550:             * 
1551:             * @param login login that commit the transaction
1552:             * @param transactionId id of the commiting transaction
1553:             * @throws SQLException if an error occurs
1554:             */
1555:            public abstract void distributedCommit(String login,
1556:                    long transactionId) throws SQLException;
1557:
1558:            /**
1559:             * Distributed implementation of a rollback
1560:             * 
1561:             * @param login login that rollback the transaction
1562:             * @param transactionId id of the rollbacking transaction
1563:             * @throws SQLException if an error occurs
1564:             */
1565:            public abstract void distributedRollback(String login,
1566:                    long transactionId) throws SQLException;
1567:
1568:            /**
1569:             * Distributed implementation of a rollback to a savepoint
1570:             * 
1571:             * @param login login that rollback the transaction
1572:             * @param transactionId id of the transaction
1573:             * @param savepointName name of the savepoint
1574:             * @throws SQLException if an error occurs
1575:             */
1576:            public abstract void distributedRollback(String login,
1577:                    long transactionId, String savepointName)
1578:                    throws SQLException;
1579:
1580:            /**
1581:             * Distributed implementation of setting a savepoint to a transaction
1582:             * 
1583:             * @param login login that releases the savepoint
1584:             * @param transactionId id of the transaction
1585:             * @param name name of the savepoint to set
1586:             * @throws SQLException if an error occurs
1587:             */
1588:            public abstract void distributedSetSavepoint(String login,
1589:                    long transactionId, String name) throws SQLException;
1590:
1591:            /**
1592:             * Distributed implementation of releasing a savepoint from a transaction
1593:             * 
1594:             * @param login login that set the savepoint
1595:             * @param transactionId id of the transaction
1596:             * @param name name of the savepoint to release
1597:             * @throws SQLException if an error occurs
1598:             */
1599:            public abstract void distributedReleaseSavepoint(String login,
1600:                    long transactionId, String name) throws SQLException;
1601:
1602:            /**
1603:             * Distributed implementation of a select request execution that returns a
1604:             * ResultSet.
1605:             * 
1606:             * @param request request to execute
1607:             * @return ResultSet containing the auto-generated keys.
1608:             * @throws SQLException if an error occurs
1609:             */
1610:            public abstract ControllerResultSet distributedStatementExecuteQuery(
1611:                    SelectRequest request) throws SQLException;
1612:
1613:            /**
1614:             * Distributed implementation of a write request execution.
1615:             * 
1616:             * @param request request to execute
1617:             * @return number of modified rows
1618:             * @throws SQLException if an error occurs
1619:             */
1620:            public abstract ExecuteUpdateResult distributedStatementExecuteUpdate(
1621:                    AbstractWriteRequest request) throws SQLException;
1622:
1623:            /**
1624:             * Distributed implementation of a write request execution that returns
1625:             * auto-generated keys.
1626:             * 
1627:             * @param request request to execute
1628:             * @return update count and ResultSet containing the auto-generated keys.
1629:             * @throws SQLException if an error occurs
1630:             */
1631:            public abstract GeneratedKeysResult distributedStatementExecuteUpdateWithKeys(
1632:                    AbstractWriteRequest request) throws SQLException;
1633:
1634:            /**
1635:             * Distributed implementation of a Statement.execute() execution.
1636:             * 
1637:             * @param request request to execute
1638:             * @return an <code>ExecuteResult</code> object
1639:             * @throws SQLException if an error occurs
1640:             */
1641:            public abstract ExecuteResult distributedStatementExecute(
1642:                    AbstractRequest request) throws SQLException;
1643:
1644:            /**
1645:             * Distributed implementation of a stored procedure
1646:             * CallableStatement.executeQuery() execution.
1647:             * 
1648:             * @param proc stored procedure to execute
1649:             * @return ResultSet corresponding to this stored procedure execution
1650:             * @throws SQLException if an error occurs
1651:             */
1652:            public abstract ControllerResultSet distributedCallableStatementExecuteQuery(
1653:                    StoredProcedure proc) throws SQLException;
1654:
1655:            /**
1656:             * Distributed implementation of a stored procedure
1657:             * CallableStatement.executeUpdate() execution.
1658:             * 
1659:             * @param proc stored procedure to execute
1660:             * @return number of modified rows
1661:             * @throws SQLException if an error occurs
1662:             */
1663:            public abstract ExecuteUpdateResult distributedCallableStatementExecuteUpdate(
1664:                    StoredProcedure proc) throws SQLException;
1665:
1666:            /**
1667:             * Distributed implementation of a stored procedure
1668:             * CallableStatement.execute() execution.
1669:             * 
1670:             * @param proc stored procedure to execute
1671:             * @return an <code>ExecuteResult</code> object
1672:             * @throws SQLException if an error occurs
1673:             */
1674:            public abstract ExecuteResult distributedCallableStatementExecute(
1675:                    StoredProcedure proc) throws SQLException;
1676:
1677:            /**
1678:             * Once the request has been dispatched, it can be executed using the code
1679:             * from <code>RequestManager</code>
1680:             * 
1681:             * @param proc stored procedure to execute
1682:             * @return ResultSet corresponding to this stored procedure execution
1683:             * @throws AllBackendsFailedException if all backends failed to execute the
1684:             *           stored procedure
1685:             * @throws SQLException if an error occurs
1686:             */
1687:            public ControllerResultSet execLocallyCallableStatementExecuteQuery(
1688:                    StoredProcedure proc) throws AllBackendsFailedException,
1689:                    SQLException {
1690:                return super .callableStatementExecuteQuery(proc);
1691:            }
1692:
1693:            /**
1694:             * Once the request has been dispatched, it can be executed using the code
1695:             * from <code>RequestManager</code>
1696:             * 
1697:             * @param proc stored procedure to execute
1698:             * @return ExecuteResult corresponding to this stored procedure execution
1699:             * @throws AllBackendsFailedException if all backends failed to execute the
1700:             *           stored procedure
1701:             * @throws SQLException if an error occurs
1702:             */
1703:            public ExecuteResult execLocallyCallableStatementExecute(
1704:                    StoredProcedure proc) throws AllBackendsFailedException,
1705:                    SQLException {
1706:                return super .callableStatementExecute(proc);
1707:            }
1708:
1709:            /**
1710:             * Test if a transaction has been started on this controller, but initialized
1711:             * by a remote controller.
1712:             * 
1713:             * @param currentTid Current transaction Id
1714:             * @return True if this transaction of Id currentId has already been started
1715:             *         on the current controller
1716:             */
1717:            public boolean isDistributedTransaction(long currentTid) {
1718:                synchronized (distributedTransactions) {
1719:                    return distributedTransactions
1720:                            .contains(new Long(currentTid));
1721:                }
1722:            }
1723:
1724:            /**
1725:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#resumeActivity()
1726:             */
1727:            public void resumeActivity() {
1728:                // Perform the distributed call through the group-comm, in order to
1729:                // resume all the activity suspended above.
1730:                try {
1731:                    // Suspend transactions
1732:                    dvdb.getMulticastRequestAdapter().multicastMessage(
1733:                            dvdb.getAllMembers(),
1734:                            new ResumeActivity(),
1735:                            MulticastRequestAdapter.WAIT_ALL,
1736:                            dvdb.getMessageTimeouts()
1737:                                    .getDisableBackendTimeout());
1738:                    logger.info("All activity is now resumed for "
1739:                            + dvdb.getDatabaseName());
1740:                } catch (Exception e) {
1741:                    String msg = "Error while resuming activity";
1742:                    logger.error(msg, e);
1743:                }
1744:            }
1745:
1746:            /**
1747:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#suspendActivity()
1748:             */
1749:            public void suspendActivity() throws SQLException {
1750:                // Perform the distributed call through the group-comm, in order to
1751:                // atomically suspend all activity on the system.
1752:                try {
1753:                    // Suspend transactions
1754:                    dvdb.getMulticastRequestAdapter().multicastMessage(
1755:                            dvdb.getAllMembers(),
1756:                            new SuspendActivity(),
1757:                            MulticastRequestAdapter.WAIT_ALL,
1758:                            dvdb.getMessageTimeouts()
1759:                                    .getDisableBackendTimeout());
1760:                    logger.info("All activity is suspended for "
1761:                            + dvdb.getDatabaseName());
1762:                } catch (Exception e) {
1763:                    String msg = "Error while suspending activity";
1764:                    logger.error(msg, e);
1765:                    throw (SQLException) new SQLException(msg + "(" + e + ")")
1766:                            .initCause(e);
1767:                }
1768:            }
1769:
1770:            /**
1771:             * Suspend all transactions, writes and persistent connections and 
1772:             * <strong>do not wait for completion of in-flight transactions 
1773:             * and/or persistent connections</strong>.
1774:             * <p>
1775:             * This method blocks activity in the cluster and should not be called in
1776:             * normal cluster situation. It should be reserved for extraordinary 
1777:             * situation such as network partition detection &amp; reconciliation.
1778:             * </p>
1779:             * 
1780:             * @throws SQLException if an error occured
1781:             */
1782:            public void blockActivity() throws SQLException {
1783:                try {
1784:                    dvdb.getMulticastRequestAdapter().multicastMessage(
1785:                            dvdb.getAllMembers(),
1786:                            new BlockActivity(),
1787:                            MulticastRequestAdapter.WAIT_ALL,
1788:                            dvdb.getMessageTimeouts()
1789:                                    .getDisableBackendTimeout());
1790:                    logger.info("All activity is blocked for "
1791:                            + dvdb.getDatabaseName());
1792:                } catch (Exception e) {
1793:                    String msg = "Error while blocking activity";
1794:                    logger.error(msg, e);
1795:                    throw (SQLException) new SQLException(msg + "(" + e + ")")
1796:                            .initCause(e);
1797:                }
1798:            }
1799:
1800:            /**
1801:             * Notify controllers that they are now inconsistent with the cluster and that
1802:             * they sould disable themselves.
1803:             * 
1804:             * @param request request that generated the consistency
1805:             * @param inconsistentControllers controllers that need to be notified
1806:             * @throws SQLException if an error occurs
1807:             */
1808:            protected void notifyControllerInconsistency(
1809:                    AbstractRequest request, ArrayList inconsistentControllers)
1810:                    throws SQLException {
1811:                try {
1812:                    dvdb.getMulticastRequestAdapter().multicastMessage(
1813:                            inconsistentControllers,
1814:                            new NotifyInconsistency(request),
1815:                            MulticastRequestAdapter.WAIT_ALL, 0);
1816:                } catch (Exception e) {
1817:                    String msg = "An error occured while notifying controllers ("
1818:                            + inconsistentControllers
1819:                            + ") of inconsistency due to distributed request "
1820:                            + request.getId();
1821:                    logger.warn(msg, e);
1822:                    throw new SQLException(msg + " (" + e + ")");
1823:                }
1824:            }
1825:
1826:            /**
1827:             * Notify a set of backends of the query completion.
1828:             * 
1829:             * @param request the request that has completed
1830:             * @param success true if the request has successfully completed
1831:             * @param disableBackendOnSuccess disable all local backends if query was
1832:             *          successful (but failed locally). Usually set to true in case of
1833:             *          AllBackendsFailedException and false for NoMoreBackendException.
1834:             * @param backendsToNotify list of backends to notify (returns right away if
1835:             *          the list is null)
1836:             * @throws SQLException if an error occurs
1837:             */
1838:            protected void notifyRequestCompletion(AbstractRequest request,
1839:                    boolean success, boolean disableBackendOnSuccess,
1840:                    ArrayList backendsToNotify) throws SQLException {
1841:                if (backendsToNotify == null)
1842:                    return;
1843:                try {
1844:                    dvdb.getMulticastRequestAdapter().multicastMessage(
1845:                            backendsToNotify,
1846:                            new NotifyCompletion(request, success,
1847:                                    disableBackendOnSuccess),
1848:                            MulticastRequestAdapter.WAIT_ALL,
1849:                            dvdb.getMessageTimeouts()
1850:                                    .getNotifyCompletionTimeout());
1851:                } catch (Exception e) {
1852:                    String msg = "An error occured while notifying all controllers of failure of distributed request "
1853:                            + request.getId();
1854:                    logger.warn(msg, e);
1855:                    throw new SQLException(msg + " (" + e + ")");
1856:                }
1857:            }
1858:
1859:            /**
1860:             * Notify a set of backends of the query completion.
1861:             * 
1862:             * @param request the request that has completed
1863:             * @param success true if the request has successfully completed
1864:             * @param disableBackendOnSuccess disable all local backends if query was
1865:             *          successful (but failed locally). Usually set to true in case of
1866:             *          AllBackendsFailedException and false for NoMoreBackendException.
1867:             * @param backendsToNotify list of backends to notify (returns right away if
1868:             *          the list is null)
1869:             * @param requestUpdateCount the request update count to be logged if it
1870:             *          succeeded somewhere
1871:             * @throws SQLException
1872:             */
1873:            protected void notifyRequestCompletion(AbstractRequest request,
1874:                    boolean success, boolean disableBackendOnSuccess,
1875:                    ArrayList backendsToNotify, int requestUpdateCount)
1876:                    throws SQLException {
1877:                if (backendsToNotify == null)
1878:                    return;
1879:                try {
1880:                    dvdb.getMulticastRequestAdapter()
1881:                            .multicastMessage(
1882:                                    backendsToNotify,
1883:                                    new NotifyCompletion(request, success,
1884:                                            disableBackendOnSuccess,
1885:                                            requestUpdateCount),
1886:                                    MulticastRequestAdapter.WAIT_ALL,
1887:                                    dvdb.getMessageTimeouts()
1888:                                            .getNotifyCompletionTimeout());
1889:                } catch (Exception e) {
1890:                    String msg = "An error occured while notifying all controllers of failure of distributed request "
1891:                            + request.getId();
1892:                    logger.warn(msg, e);
1893:                    throw new SQLException(msg + " (" + e + ")");
1894:                }
1895:            }
1896:
1897:            /**
1898:             * Cleanup all queries from a given transaction that were registered as failed
1899:             * and issued from the other controller. This is used when a controller has
1900:             * failed and no status information will be returned for failed queries.
1901:             * Queries are systematically tagged as failed. This method is called only for
1902:             * failover during a rollback / abort to properly close the transaction on the
1903:             * remaining controller.
1904:             * 
1905:             * @param tId the transaction id that we are looking for
1906:             */
1907:            public void cleanupRollbackFromOtherController(long tId) {
1908:                long cid = this .getControllerId();
1909:                synchronized (failedOnAllBackends) {
1910:                    for (Iterator iter = failedOnAllBackends.keySet()
1911:                            .iterator(); iter.hasNext();) {
1912:                        AbstractRequest request = (AbstractRequest) iter.next();
1913:                        if (((request.getId() & CONTROLLER_ID_BIT_MASK) != cid)
1914:                                || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) != cid)
1915:                                && request.getTransactionId() == tId) { // Need to remove that entry
1916:                            if (logger.isInfoEnabled())
1917:                                logger
1918:                                        .info("Failover while rollbacking the transaction "
1919:                                                + tId
1920:                                                + " detected. No status information received for request "
1921:                                                + request
1922:                                                + ", considering status as failed.");
1923:                            FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
1924:                                    .get(request);
1925:
1926:                            // If the current request is a rollback / abort, we need to call
1927:                            // logRequestCompletionAndNotifyScheduler with success set to true
1928:                            // for the transaction to be correctly cleaned up
1929:                            boolean isAbortOrRollback = (request instanceof  UnknownWriteRequest)
1930:                                    && ("rollback".equals(request
1931:                                            .getSqlOrTemplate()) || "abort"
1932:                                            .equals(request.getSqlOrTemplate()));
1933:
1934:                            logRequestCompletionAndNotifyScheduler(request,
1935:                                    isAbortOrRollback, failureInfo, -1);
1936:                            iter.remove();
1937:                        }
1938:                    }
1939:                }
1940:            }
1941:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.