Source Code Cross Referenced for AbstractScheduler.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » scheduler » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.scheduler 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         *
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         *
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         *
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License.
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet.
0022:         * Contributor(s): Jean-Bernard van Zuylen, Peter Royal, Damian Arregui.
0023:         */package org.continuent.sequoia.controller.scheduler;
0024:
0025:        import java.sql.SQLException;
0026:        import java.util.ArrayList;
0027:        import java.util.HashMap;
0028:        import java.util.HashSet;
0029:        import java.util.Hashtable;
0030:        import java.util.Iterator;
0031:        import java.util.List;
0032:        import java.util.Map;
0033:        import java.util.Set;
0034:
0035:        import org.continuent.sequoia.common.exceptions.RollbackException;
0036:        import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException;
0037:        import org.continuent.sequoia.common.i18n.Translate;
0038:        import org.continuent.sequoia.common.log.Trace;
0039:        import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0040:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0041:        import org.continuent.sequoia.common.xml.XmlComponent;
0042:        import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0043:        import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0044:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0045:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0046:        import org.continuent.sequoia.controller.requests.SelectRequest;
0047:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0048:        import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0049:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0050:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0051:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0052:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0053:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0054:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0055:
0056:        /**
0057:         * The Request Scheduler should schedule the request according to a given
0058:         * policy.
0059:         * <p>
0060:         * The requests comes from the Request Controller and are sent later to the next
0061:         * ccontroller omponents (cache and load balancer).
0062:         *
0063:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0064:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0065:         *         </a>
0066:         * @author <a href="mailto:damian.arregui@continuent.com">Damian Arregui</a>
0067:         * @version 1.0
0068:         */
0069:        public abstract class AbstractScheduler implements  XmlComponent {
0070:
0071:            //
0072:            // How the code is organized ?
0073:            //
0074:            // 1. Member variables
0075:            // 2. Constructor
0076:            // 3. Getter/Setter
0077:            // 4. Request handling
0078:            // 5. Transaction management
0079:            // 6. Checkpoint management
0080:            // 7. Debug/Monitoring
0081:            //
0082:
0083:            //
0084:            // 1. Member variables
0085:            //
0086:
0087:            // Request handling
0088:            private int suspendWrites = 0;
0089:            private int pendingWrites = 0;
0090:            private final Object writesSync = new Object();
0091:            private final Object endOfCurrentWrites = new Object();
0092:            /**
0093:             * Read requests only account for SelectRequest objects (stored procedures
0094:             * even with a read-only semantic will be in the write requests list).
0095:             */
0096:            private Map activeReadRequests = new HashMap();
0097:
0098:            /**
0099:             * Write requests also include stored procedures.
0100:             */
0101:            private Map activeWriteRequests = new HashMap();
0102:            private Set suspendedRequests = new HashSet();
0103:
0104:            // Transaction management
0105:            private long controllerId = 0;
0106:            private long transactionId = 0;
0107:            private int savepointId = 0;
0108:            private int suspendTransactions = 0;
0109:            private int pendingTransactions = 0;
0110:            private final Object transactionsSync = new Object();
0111:            private final Object endOfCurrentTransactions = new Object();
0112:            private List activeTransactions = new ArrayList();
0113:            private long waitForSuspendedTransactionsTimeout;
0114:
0115:            // Persistent connection management
0116:            private int suspendNewPersistentConnections = 0;
0117:            private int suspendOpenClosePersistentConnections = 0;
0118:            private int pendingOpenClosePersistentConnections = 0;
0119:            private final Object persistentConnectionsSync = new Object();
0120:            private final Object suspendOpenClosePersistentConnectionSync = new Object();
0121:            private final Object endOfCurrentPersistentConnections = new Object();
0122:            private long waitForPersistentConnectionsTimeout;
0123:
0124:            /**
0125:             * List of persistent connections that have been created <br>
0126:             * persistentConnectionId (Long) -> vLogin (String)
0127:             */
0128:            protected Hashtable activePersistentConnections = new Hashtable();
0129:
0130:            // Monitoring values
0131:            private int numberRead = 0;
0132:            private int numberWrite = 0;
0133:
0134:            // Other
0135:            protected int raidbLevel;
0136:            protected int parsingGranularity;
0137:
0138:            /** Reference to the associated distributed virtual database */
0139:            private VirtualDatabase vdb = null;
0140:
0141:            private static final int INITIAL_WAIT_TIME = 15000;
0142:            protected static Trace logger = Trace
0143:                    .getLogger("org.continuent.sequoia.controller.scheduler");
0144:
0145:            //
0146:            // 2. Constructor
0147:            //
0148:
0149:            /**
0150:             * Default scheduler to assign scheduler RAIDb level, needed granularity and
0151:             * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
0152:             *
0153:             * @param raidbLevel RAIDb level of this scheduler
0154:             * @param parsingGranularity Parsing granularity needed by the scheduler
0155:             * @param vdb virtual database using this scheduler (needed to access its
0156:             *          total order queue)
0157:             */
0158:            public AbstractScheduler(int raidbLevel, int parsingGranularity,
0159:                    VirtualDatabase vdb,
0160:                    long waitForSuspendedTransactionsTimeout,
0161:                    long waitForPersistentConnectionsTimeout) {
0162:                this .raidbLevel = raidbLevel;
0163:                this .parsingGranularity = parsingGranularity;
0164:                this .vdb = vdb;
0165:                this .waitForSuspendedTransactionsTimeout = waitForSuspendedTransactionsTimeout;
0166:                this .waitForPersistentConnectionsTimeout = waitForPersistentConnectionsTimeout;
0167:            }
0168:
0169:            /**
0170:             * Default scheduler to assign scheduler RAIDb level, needed granularity and
0171:             * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
0172:             *
0173:             * @param raidbLevel RAIDb level of this scheduler
0174:             * @param parsingGranularity Parsing granularity needed by the scheduler
0175:             * @deprecated This constructor is used only by unsupported scheduler
0176:             *             sub-classes.
0177:             */
0178:            public AbstractScheduler(int raidbLevel, int parsingGranularity) {
0179:                this .raidbLevel = raidbLevel;
0180:                this .parsingGranularity = parsingGranularity;
0181:                this .waitForSuspendedTransactionsTimeout = 5 * 60 * 1000;
0182:                this .waitForPersistentConnectionsTimeout = 5 * 60 * 1000;
0183:            }
0184:
0185:            //
0186:            // 3. Getter/Setter methods
0187:            //
0188:
0189:            /**
0190:             * Get the needed query parsing granularity.
0191:             *
0192:             * @return needed query parsing granularity
0193:             */
0194:            public final int getParsingGranularity() {
0195:                return parsingGranularity;
0196:            }
0197:
0198:            /**
0199:             * Assigns the local controllerId. It is used for generating transactionIds
0200:             * for autocommit requests.
0201:             *
0202:             * @param controllerId for this controller
0203:             */
0204:            public void setControllerId(long controllerId) {
0205:                this .controllerId = controllerId;
0206:            }
0207:
0208:            /**
0209:             * Returns the RAIDbLevel.
0210:             *
0211:             * @return int
0212:             */
0213:            public final int getRAIDbLevel() {
0214:                return raidbLevel;
0215:            }
0216:
0217:            /**
0218:             * Sets the <code>DatabaseSchema</code> of the current virtual database.
0219:             * This is only needed by some schedulers that will have to define their own
0220:             * scheduler schema
0221:             *
0222:             * @param dbs a <code>DatabaseSchema</code> value
0223:             * @see org.continuent.sequoia.controller.scheduler.schema.SchedulerDatabaseSchema
0224:             */
0225:            public void setDatabaseSchema(DatabaseSchema dbs) {
0226:                if (logger.isInfoEnabled())
0227:                    logger.info(Translate
0228:                            .get("scheduler.doesnt.support.schemas"));
0229:            }
0230:
0231:            /**
0232:             * Merge the given <code>DatabaseSchema</code> with the current one.
0233:             *
0234:             * @param dbs a <code>DatabaseSchema</code> value
0235:             * @see org.continuent.sequoia.controller.scheduler.schema.SchedulerDatabaseSchema
0236:             */
0237:            public void mergeDatabaseSchema(DatabaseSchema dbs) {
0238:                logger.info(Translate.get("scheduler.doesnt.support.schemas"));
0239:            }
0240:
0241:            //
0242:            // 4. Request handling
0243:            //
0244:
0245:            /**
0246:             * Returns the list of active read requests <request id, SelectRequest>.
0247:             *
0248:             * @return Returns the active read requests.
0249:             */
0250:            public final Map getActiveReadRequests() {
0251:                return activeReadRequests;
0252:            }
0253:
0254:            /**
0255:             * Returns the list of active write requests <request id, AbstractRequest>.
0256:             * Write requests can be either StoredProcedure or AbstractWriteRequest
0257:             * objects.
0258:             *
0259:             * @return Returns the active write requests.
0260:             */
0261:            public final Map getActiveWriteRequests() {
0262:                return activeWriteRequests;
0263:            }
0264:
0265:            /**
0266:             * Returns the number of pending writes.
0267:             *
0268:             * @return int
0269:             */
0270:            public final int getPendingWrites() {
0271:                return pendingWrites;
0272:            }
0273:
0274:            /**
0275:             * Returns true if the given request id is in the active request list.
0276:             *
0277:             * @param requestId the request unique id
0278:             * @return true if the request is active, false otherwise
0279:             */
0280:            public boolean isActiveRequest(long requestId) {
0281:                Long lId = new Long(requestId);
0282:                synchronized (activeReadRequests) {
0283:                    if (activeReadRequests.containsKey(lId))
0284:                        return true;
0285:                }
0286:                synchronized (activeWriteRequests) {
0287:                    return activeWriteRequests.containsKey(lId);
0288:                }
0289:            }
0290:
0291:            /**
0292:             * Wait for the completion of the given request id. The method returns as soon
0293:             * as the request completion has been notified to the scheduler.
0294:             *
0295:             * @param requestId the unique request identifier
0296:             */
0297:            public void waitForRequestCompletion(long requestId) {
0298:                Long lId = new Long(requestId);
0299:                synchronized (activeReadRequests) {
0300:                    while (activeReadRequests.containsKey(lId)) {
0301:                        try {
0302:                            activeReadRequests.wait();
0303:                        } catch (InterruptedException ignore) {
0304:                        }
0305:                    }
0306:                }
0307:                synchronized (activeWriteRequests) {
0308:                    while (activeWriteRequests.containsKey(lId)) {
0309:                        try {
0310:                            activeWriteRequests.wait();
0311:                        } catch (InterruptedException ignore) {
0312:                        }
0313:                    }
0314:                }
0315:            }
0316:
0317:            /**
0318:             * Schedule a read request
0319:             *
0320:             * @param request select request to schedule
0321:             * @exception SQLException if a timeout occurs or a query with the same id has
0322:             *              already been scheduled.
0323:             */
0324:            public void scheduleReadRequest(SelectRequest request)
0325:                    throws SQLException {
0326:                Long id = new Long(request.getId());
0327:                synchronized (activeReadRequests) {
0328:                    if (activeReadRequests.containsKey(id))
0329:                        throw new SQLException("A query with id " + id
0330:                                + " has already been scheduled");
0331:                    activeReadRequests.put(id, request);
0332:                }
0333:
0334:                // Assign a unique transaction id to requests in autocommit mode as well
0335:                if (request.isAutoCommit() && request.isMustBroadcast()) {
0336:                    long fakeTid = getNextTransactionId();
0337:                    fakeTid = fakeTid
0338:                            & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
0339:                    fakeTid = fakeTid | controllerId;
0340:                    request.setTransactionId(fakeTid);
0341:                }
0342:
0343:                try {
0344:                    scheduleNonSuspendedReadRequest(request);
0345:                } catch (SQLException e) {
0346:                    // Remove query for the active queue if we failed to schedule
0347:                    synchronized (activeReadRequests) {
0348:                        activeReadRequests.remove(id);
0349:                    }
0350:                    throw e;
0351:                }
0352:            }
0353:
0354:            /**
0355:             * Schedule a read request (implementation specific)
0356:             *
0357:             * @param request Select request to schedule (SQL macros are already handled
0358:             *          if needed)
0359:             * @exception SQLException if a timeout occurs
0360:             */
0361:            protected abstract void scheduleNonSuspendedReadRequest(
0362:                    SelectRequest request) throws SQLException;
0363:
0364:            /**
0365:             * Notify the completion of a read statement.
0366:             *
0367:             * @param request the completed request
0368:             * @throws SQLException if the query was not in the list of active read
0369:             *           requests (not scheduled)
0370:             */
0371:            public final void readCompleted(SelectRequest request)
0372:                    throws SQLException {
0373:                Long id = new Long(request.getId());
0374:                synchronized (activeReadRequests) {
0375:                    if (activeReadRequests.remove(id) == null)
0376:                        throw new SQLException(
0377:                                "Query "
0378:                                        + id
0379:                                        + " is not in the list of currently scheduled queries");
0380:                    activeReadRequests.notifyAll();
0381:                }
0382:                numberRead++;
0383:                this .readCompletedNotify(request);
0384:            }
0385:
0386:            /**
0387:             * Notify the completion of a read statement.
0388:             *
0389:             * @param request the completed request
0390:             */
0391:            protected abstract void readCompletedNotify(SelectRequest request);
0392:
0393:            /**
0394:             * Schedule a write request. This method blocks if the writes are suspended.
0395:             * Then the number of pending writes is updated and the implementation
0396:             * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
0397:             * are replaced in the request if the scheduler has needSQLMacroHandling set
0398:             * to true.
0399:             *
0400:             * @param request Write request to schedule
0401:             * @exception SQLException if a timeout occurs or a query with the same id has
0402:             *              already been scheduled.
0403:             * @exception RollbackException if an error occurs
0404:             * @see #scheduleNonSuspendedWriteRequest(AbstractWriteRequest)
0405:             */
0406:            public final void scheduleWriteRequest(AbstractWriteRequest request)
0407:                    throws SQLException, RollbackException {
0408:                suspendWriteIfNeededAndAddQueryToActiveRequests(request);
0409:                scheduleNonSuspendedWriteRequest(request);
0410:
0411:                // Assign a unique transaction id to requests in autocommit mode as well
0412:                if (request.isAutoCommit()) {
0413:                    long fakeTid = getNextTransactionId();
0414:                    fakeTid = fakeTid
0415:                            & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
0416:                    fakeTid = fakeTid | controllerId;
0417:                    request.setTransactionId(fakeTid);
0418:                }
0419:            }
0420:
0421:            /**
0422:             * Schedule a write request (implementation specific). This method blocks
0423:             * until the request can be executed.
0424:             *
0425:             * @param request Write request to schedule (SQL macros are already handled if
0426:             *          needed)
0427:             * @exception SQLException if a timeout occurs
0428:             * @exception RollbackException if the transaction must be rollbacked
0429:             */
0430:            protected abstract void scheduleNonSuspendedWriteRequest(
0431:                    AbstractWriteRequest request) throws SQLException,
0432:                    RollbackException;
0433:
0434:            /**
0435:             * Notify the completion of a write statement.
0436:             * <p>
0437:             * This method updates the number of pending writes and calls the
0438:             * implementation specific notifyWriteCompleted function.
0439:             * <p>
0440:             * Finally, the suspendWrites() function is notified if needed.
0441:             *
0442:             * @param request the completed request
0443:             * @throws SQLException if the query is not in the list of scheduled queries
0444:             * @see #notifyWriteCompleted(AbstractWriteRequest)
0445:             * @see #checkPendingWrites()
0446:             */
0447:            public final void writeCompleted(AbstractWriteRequest request)
0448:                    throws SQLException {
0449:                Long id = new Long(request.getId());
0450:
0451:                synchronized (writesSync) {
0452:                    synchronized (activeWriteRequests) {
0453:                        if (activeWriteRequests.remove(id) == null)
0454:                            throw new SQLException(
0455:                                    "Query "
0456:                                            + id
0457:                                            + " is not in the list of currently scheduled queries");
0458:
0459:                        activeWriteRequests.notifyAll();
0460:                    }
0461:                    pendingWrites--;
0462:
0463:                    if (pendingWrites < 0) {
0464:                        logger
0465:                                .error("Negative pending writes detected on write request completion ("
0466:                                        + request + ")");
0467:                        pendingWrites = 0;
0468:                    }
0469:
0470:                    if (logger.isDebugEnabled())
0471:                        logger
0472:                                .debug("Write completed, remaining pending writes: "
0473:                                        + pendingWrites);
0474:
0475:                    notifyWriteCompleted(request);
0476:
0477:                    checkPendingWrites();
0478:                }
0479:                numberWrite++;
0480:            }
0481:
0482:            /**
0483:             * Notify the completion of a write statement. This method does not need to be
0484:             * synchronized, it is enforced by the caller.
0485:             *
0486:             * @param request the completed request
0487:             * @see #writeCompleted(AbstractWriteRequest)
0488:             */
0489:            protected abstract void notifyWriteCompleted(
0490:                    AbstractWriteRequest request);
0491:
0492:            /**
0493:             * Schedule a write request. This method blocks if the writes are suspended.
0494:             * Then the number of pending writes is updated and the implementation
0495:             * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
0496:             * are replaced in the request if the scheduler has needSQLMacroHandling set
0497:             * to true.
0498:             *
0499:             * @param proc Stored procedure to schedule
0500:             * @exception SQLException if a timeout occurs
0501:             * @exception RollbackException if an error occurs
0502:             * @see #scheduleNonSuspendedStoredProcedure(StoredProcedure)
0503:             */
0504:            public final void scheduleStoredProcedure(StoredProcedure proc)
0505:                    throws SQLException, RollbackException {
0506:                suspendWriteIfNeededAndAddQueryToActiveRequests(proc);
0507:                scheduleNonSuspendedStoredProcedure(proc);
0508:
0509:                // Assign a unique transaction id to requests in autocommit mode as well
0510:                if (proc.isAutoCommit()) {
0511:                    long fakeTid = getNextTransactionId();
0512:                    fakeTid = fakeTid
0513:                            & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
0514:                    fakeTid = fakeTid | controllerId;
0515:                    proc.setTransactionId(fakeTid);
0516:                }
0517:            }
0518:
0519:            /**
0520:             * Schedule a write request (implementation specific). This method blocks
0521:             * until the request can be executed.
0522:             *
0523:             * @param proc Stored procedure to schedule
0524:             * @exception SQLException if a timeout occurs
0525:             * @exception RollbackException if the transaction must be rollbacked
0526:             */
0527:            protected abstract void scheduleNonSuspendedStoredProcedure(
0528:                    StoredProcedure proc) throws SQLException,
0529:                    RollbackException;
0530:
0531:            /**
0532:             * Notify the completion of a stored procedure.
0533:             * <p>
0534:             * This method updates the number of pending writes and calls the
0535:             * implementation specific notifyStoredProcedureCompleted function.
0536:             * <p>
0537:             * Finally, the suspendWrites() function is notified if needed.
0538:             *
0539:             * @param proc the completed stored procedure
0540:             * @throws SQLException if the stored procedure was not scheduled before (not
0541:             *           in the active request list)
0542:             * @see #notifyStoredProcedureCompleted(StoredProcedure)
0543:             * @see #checkPendingWrites()
0544:             */
0545:            public final void storedProcedureCompleted(StoredProcedure proc)
0546:                    throws SQLException {
0547:                Long id = new Long(proc.getId());
0548:
0549:                synchronized (writesSync) {
0550:                    synchronized (activeWriteRequests) {
0551:                        if (activeWriteRequests.remove(id) == null)
0552:                            throw new SQLException(
0553:                                    "Query "
0554:                                            + id
0555:                                            + " is not in the list of currently scheduled queries");
0556:
0557:                        activeWriteRequests.notifyAll();
0558:                    }
0559:
0560:                    pendingWrites--;
0561:
0562:                    if (pendingWrites < 0) {
0563:                        logger
0564:                                .error("Negative pending writes detected on stored procedure completion ("
0565:                                        + proc + ")");
0566:                        pendingWrites = 0;
0567:                    }
0568:
0569:                    if (logger.isDebugEnabled())
0570:                        logger
0571:                                .debug("Stored procedure completed, remaining pending writes: "
0572:                                        + pendingWrites);
0573:
0574:                    notifyStoredProcedureCompleted(proc);
0575:
0576:                    checkPendingWrites();
0577:                }
0578:                numberWrite++;
0579:            }
0580:
0581:            /**
0582:             * Notify the completion of a stored procedure. This method does not need to
0583:             * be synchronized, it is enforced by the caller.
0584:             *
0585:             * @param proc the completed stored procedure
0586:             * @see #storedProcedureCompleted(StoredProcedure)
0587:             */
0588:            protected abstract void notifyStoredProcedureCompleted(
0589:                    StoredProcedure proc);
0590:
0591:            /**
0592:             * Suspend write requests if suspendedWrites is active. Adds the request to
0593:             * the list of active requests after successful scheduling.
0594:             *
0595:             * @param request the request to suspend (a write request or a stored
0596:             *          procedure)
0597:             * @throws SQLException if the request timeout has expired or a query with the
0598:             *           same id has already been scheduled.
0599:             */
0600:            private void suspendWriteIfNeededAndAddQueryToActiveRequests(
0601:                    AbstractRequest request) throws SQLException {
0602:                Long id = new Long(request.getId());
0603:
0604:                synchronized (writesSync) {
0605:                    if (suspendWrites > 0) {
0606:                        // Let requests in active transactions to execute since they might
0607:                        // unblock queries of other transactions.
0608:                        boolean mustBeSuspended = !request
0609:                                .isPersistentConnection()
0610:                                && (request.isAutoCommit() || !activeTransactions
0611:                                        .contains(new TransactionMetaData(
0612:                                                request.getTransactionId(),
0613:                                                0,
0614:                                                request.getLogin(),
0615:                                                request
0616:                                                        .isPersistentConnection(),
0617:                                                request
0618:                                                        .getPersistentConnectionId())));
0619:
0620:                        if (mustBeSuspended) {
0621:                            addSuspendedRequest(request);
0622:                            try {
0623:                                // Wait on writesSync
0624:                                int timeout = request.getTimeout();
0625:                                if (timeout > 0) {
0626:                                    long start = System.currentTimeMillis();
0627:                                    long lTimeout = timeout * 1000L;
0628:                                    writesSync.wait(lTimeout);
0629:                                    long end = System.currentTimeMillis();
0630:                                    int remaining = (int) (lTimeout - (end - start));
0631:                                    if (remaining > 0)
0632:                                        request.setTimeout(remaining);
0633:                                    else {
0634:                                        String msg = Translate
0635:                                                .get(
0636:                                                        "scheduler.request.timeout",
0637:                                                        new String[] {
0638:                                                                String
0639:                                                                        .valueOf(request
0640:                                                                                .getId()),
0641:                                                                String
0642:                                                                        .valueOf(request
0643:                                                                                .getTimeout()),
0644:                                                                String
0645:                                                                        .valueOf(pendingWrites) });
0646:                                        logger.warn(msg);
0647:                                        throw new SQLException(msg);
0648:                                    }
0649:                                } else
0650:                                    this .writesSync.wait();
0651:                            } catch (InterruptedException e) {
0652:                                String msg = Translate.get(
0653:                                        "scheduler.request.timeout.failed", e);
0654:                                logger.warn(msg);
0655:                                throw new SQLException(msg);
0656:                            }
0657:                        }
0658:                    }
0659:
0660:                    synchronized (activeWriteRequests) {
0661:                        if (activeWriteRequests.containsKey(id))
0662:                            throw new SQLException("A query with id " + id
0663:                                    + " has already been scheduled");
0664:
0665:                        activeWriteRequests.put(id, request);
0666:                    }
0667:                    pendingWrites++;
0668:
0669:                    if (logger.isDebugEnabled())
0670:                        logger
0671:                                .debug("Schedule " + request.getUniqueKey()
0672:                                        + " - Current pending writes: "
0673:                                        + pendingWrites);
0674:                }
0675:            }
0676:
0677:            /**
0678:             * Signals the start of a persistent connection opening operation.
0679:             *
0680:             * @param dmsg distributed message which triggered this operation
0681:             */
0682:            public void scheduleOpenPersistentConnection(
0683:                    DistributedOpenPersistentConnection dmsg) {
0684:                checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount();
0685:
0686:                // Underlying Hashtable is synchronized and we systematically overwrite
0687:                // any previous value, it is as fast as checking first.
0688:                // Check if persistent connections creation is suspended
0689:                synchronized (persistentConnectionsSync) {
0690:                    if (suspendNewPersistentConnections > 0) {
0691:                        addSuspendedRequest(dmsg);
0692:                        try {
0693:                            persistentConnectionsSync.wait();
0694:                        } catch (InterruptedException e) {
0695:                            e.printStackTrace();
0696:                        }
0697:                    }
0698:                    activePersistentConnections.put(new Long(dmsg
0699:                            .getPersistentConnectionId()), dmsg.getLogin());
0700:                }
0701:            }
0702:
0703:            /**
0704:             * Schedule a close persistent connection.
0705:             */
0706:            public void scheduleClosePersistentConnection() {
0707:                checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount();
0708:            }
0709:
0710:            private void checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount() {
0711:                synchronized (suspendOpenClosePersistentConnectionSync) {
0712:                    while (suspendOpenClosePersistentConnections > 0) {
0713:                        try {
0714:                            suspendOpenClosePersistentConnectionSync.wait();
0715:                        } catch (InterruptedException e) {
0716:                        }
0717:                    }
0718:                    pendingOpenClosePersistentConnections++;
0719:                }
0720:            }
0721:
0722:            private void decrementOpenClosePersistentConnectionCount() {
0723:                synchronized (suspendOpenClosePersistentConnectionSync) {
0724:                    pendingOpenClosePersistentConnections--;
0725:                    if (pendingOpenClosePersistentConnections < 0) {
0726:                        logger
0727:                                .error("Negative count of pending open/close persistent connections");
0728:                        pendingOpenClosePersistentConnections = 0;
0729:                    }
0730:                    if (suspendOpenClosePersistentConnections == 0)
0731:                        suspendOpenClosePersistentConnectionSync.notifyAll();
0732:                }
0733:            }
0734:
0735:            /**
0736:             * Notify open persistent connection completion. If it failed the connection
0737:             * is removed from the persistentConnections table.
0738:             *
0739:             * @param persistentConnectionId id of the opened persistent connection
0740:             * @param success true if connection opening was successful in which case the
0741:             *          connection is added to the persistent connection list
0742:             */
0743:            public void openPersistentConnectionCompleted(
0744:                    long persistentConnectionId, boolean success) {
0745:                decrementOpenClosePersistentConnectionCount();
0746:                if (!success)
0747:                    synchronized (endOfCurrentPersistentConnections) {
0748:                        activePersistentConnections.remove(new Long(
0749:                                persistentConnectionId));
0750:                        endOfCurrentPersistentConnections.notifyAll();
0751:                    }
0752:            }
0753:
0754:            /**
0755:             * Signals the completion of a persistent connection closing operation.
0756:             *
0757:             * @param persistentConnectionId id of the closed persistent connection
0758:             */
0759:            public void closePersistentConnectionCompleted(
0760:                    long persistentConnectionId) {
0761:                decrementOpenClosePersistentConnectionCount();
0762:                synchronized (endOfCurrentPersistentConnections) {
0763:                    activePersistentConnections.remove(new Long(
0764:                            persistentConnectionId));
0765:                    endOfCurrentPersistentConnections.notifyAll();
0766:                }
0767:            }
0768:
0769:            /**
0770:             * Returns the login associated with a given persistent connection.
0771:             *
0772:             * @param persistentConnectionId the id of the persistent connection
0773:             * @return the associated login
0774:             */
0775:            public String getPersistentConnectionLogin(
0776:                    Long persistentConnectionId) {
0777:                return (String) activePersistentConnections
0778:                        .get(persistentConnectionId);
0779:            }
0780:
0781:            /**
0782:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#hasPersistentConnection(long)
0783:             */
0784:            public boolean hasPersistentConnection(long persistentConnectionId) {
0785:                return activePersistentConnections.contains(new Long(
0786:                        persistentConnectionId));
0787:            }
0788:
0789:            /**
0790:             * Returns a hashtable of all the open persistent connections (and their
0791:             * associated login).
0792:             *
0793:             * @return persistent connection hashtable
0794:             */
0795:            public Hashtable getOpenPersistentConnections() {
0796:                return activePersistentConnections;
0797:            }
0798:
0799:            //
0800:            // 5. Transaction management
0801:            //
0802:
0803:            /**
0804:             * Returns the list of active transactions (list contains transaction ids).
0805:             *
0806:             * @return Returns the active transaction ids.
0807:             */
0808:            public final List getActiveTransactions() {
0809:                return activeTransactions;
0810:            }
0811:
0812:            /**
0813:             * Retrieve the next transaction identifier
0814:             *
0815:             * @return next transaction identifier
0816:             */
0817:            public long getNextTransactionId() {
0818:                synchronized (transactionsSync) {
0819:                    return transactionId++;
0820:                }
0821:            }
0822:
0823:            /**
0824:             * Increments the savepoint id for un-named savepoints
0825:             *
0826:             * @return the next savepoint Id
0827:             */
0828:            public synchronized int incrementSavepointId() {
0829:                savepointId++;
0830:                return savepointId;
0831:            }
0832:
0833:            /**
0834:             * Initialize the transaction id with the given value (usually retrieved from
0835:             * the recovery log).
0836:             *
0837:             * @param transactionId new current transaction identifier
0838:             */
0839:            public final void initializeTransactionId(long transactionId) {
0840:                synchronized (transactionsSync) {
0841:                    // Use the max operator as a safeguard: IDs may have been delivered but
0842:                    // not logged yet.
0843:                    this .transactionId = Math.max(this .transactionId + 1,
0844:                            transactionId);
0845:                }
0846:            }
0847:
0848:            /**
0849:             * Begin a new transaction with the transaction identifier provided in the
0850:             * transaction meta data parameter. Note that this id must retrieve beforehand
0851:             * by calling getNextTransactionId(). This method is called from the driver
0852:             * when setAutoCommit(false) is called.
0853:             *
0854:             * @param tm The transaction marker metadata
0855:             * @param isLazyStart true if this begin is triggered by a lazy transaction
0856:             *          start of a transaction initiated by a remote controller. In that
0857:             *          case, suspended transactions will be ignored (but not suspended
0858:             *          writes)
0859:             * @param request request which triggered this operation
0860:             * @throws SQLException if an error occurs
0861:             */
0862:            public final void begin(TransactionMetaData tm,
0863:                    boolean isLazyStart, AbstractRequest request)
0864:                    throws SQLException {
0865:                // Check if transactions are suspended
0866:                boolean retry;
0867:                do {
0868:                    retry = false;
0869:                    synchronized (transactionsSync) {
0870:                        if ((suspendTransactions > 0) && !isLazyStart
0871:                                && !tm.isPersistentConnection()) {
0872:                            addSuspendedRequest(request);
0873:                            try {
0874:                                // Wait on transactionSync
0875:                                long timeout = tm.getTimeout();
0876:                                if (timeout > 0) {
0877:                                    long start = System.currentTimeMillis();
0878:                                    transactionsSync.wait(timeout);
0879:                                    long end = System.currentTimeMillis();
0880:                                    long remaining = timeout - (end - start);
0881:                                    if (remaining > 0)
0882:                                        tm.setTimeout(remaining);
0883:                                    else {
0884:                                        String msg = Translate
0885:                                                .get(
0886:                                                        "scheduler.begin.timeout.transactionSync",
0887:                                                        pendingTransactions);
0888:                                        logger.warn(msg);
0889:                                        throw new SQLException(msg);
0890:                                    }
0891:                                } else
0892:                                    transactionsSync.wait();
0893:                            } catch (InterruptedException e) {
0894:                                String msg = Translate
0895:                                        .get(
0896:                                                "scheduler.begin.timeout.transactionSync",
0897:                                                pendingTransactions)
0898:                                        + " (" + e + ")";
0899:                                logger.error(msg);
0900:                                throw new SQLException(msg);
0901:                            }
0902:                        }
0903:                        if (vdb != null && vdb.isRejectingNewTransaction())
0904:                            throw new VDBisShuttingDownException(
0905:                                    "VDB is shutting down... can't start a new transaction");
0906:
0907:                        pendingTransactions++;
0908:
0909:                        if (logger.isDebugEnabled())
0910:                            logger
0911:                                    .debug("Begin scheduled - current pending transactions: "
0912:                                            + pendingTransactions);
0913:                    }
0914:
0915:                    // Check if writes are suspended
0916:                    synchronized (writesSync) {
0917:                        /*
0918:                         * If suspendedTransaction changed after we left the block above, we
0919:                         * need to go back and wait there.
0920:                         */
0921:                        synchronized (transactionsSync) {
0922:                            if ((suspendTransactions > 0) && !isLazyStart
0923:                                    && !tm.isPersistentConnection()) {
0924:                                retry = true;
0925:                                pendingTransactions--;
0926:                                checkPendingTransactions();
0927:                                continue;
0928:                            }
0929:                        }
0930:                        if ((suspendWrites > 0) && !isLazyStart
0931:                                && !tm.isPersistentConnection()) {
0932:                            addSuspendedRequest(request);
0933:                            try {
0934:                                // Wait on writesSync
0935:                                long timeout = tm.getTimeout();
0936:                                if (timeout > 0) {
0937:                                    long start = System.currentTimeMillis();
0938:                                    writesSync.wait(timeout);
0939:                                    long end = System.currentTimeMillis();
0940:                                    long remaining = timeout - (end - start);
0941:                                    if (remaining > 0)
0942:                                        tm.setTimeout(remaining);
0943:                                    else {
0944:                                        String msg = Translate
0945:                                                .get(
0946:                                                        "scheduler.begin.timeout.writesSync",
0947:                                                        pendingWrites);
0948:                                        logger.warn(msg);
0949:                                        synchronized (transactionsSync) {
0950:                                            pendingTransactions--;
0951:                                        }
0952:                                        checkPendingTransactions();
0953:                                        throw new SQLException(msg);
0954:                                    }
0955:                                } else
0956:                                    writesSync.wait();
0957:                            } catch (InterruptedException e) {
0958:                                String msg = Translate.get(
0959:                                        "scheduler.begin.timeout.writesSync",
0960:                                        pendingWrites)
0961:                                        + " (" + e + ")";
0962:                                logger.error(msg);
0963:                                synchronized (transactionsSync) {
0964:                                    pendingTransactions--;
0965:                                }
0966:                                checkPendingTransactions();
0967:                                throw new SQLException(msg);
0968:                            }
0969:                        }
0970:                        pendingWrites++;
0971:
0972:                        if (logger.isDebugEnabled())
0973:                            logger
0974:                                    .debug("Begin scheduled - current pending writes: "
0975:                                            + pendingWrites);
0976:
0977:                        // Check if the transaction has not already been started and add it to
0978:                        // the
0979:                        // active transaction list
0980:                        if (activeTransactions.contains(tm)) {
0981:                            logger.error("Trying to start twice transaction "
0982:                                    + tm.getTransactionId());
0983:                        } else
0984:                            activeTransactions.add(tm);
0985:                    }
0986:                } while (retry);
0987:            }
0988:
0989:            /**
0990:             * Notify the completion of a begin command.
0991:             *
0992:             * @param transactionId of the completed begin
0993:             */
0994:            public final void beginCompleted(long transactionId) {
0995:                // Take care of suspended write
0996:                synchronized (writesSync) {
0997:                    pendingWrites--;
0998:                    if (pendingWrites < 0) {
0999:                        logger
1000:                                .error("Negative pending writes detected on begin completion for transaction "
1001:                                        + transactionId);
1002:                        pendingWrites = 0;
1003:                    }
1004:
1005:                    if (logger.isDebugEnabled())
1006:                        logger
1007:                                .debug("Begin completed, remaining pending writes: "
1008:                                        + pendingWrites);
1009:
1010:                    checkPendingWrites();
1011:                }
1012:            }
1013:
1014:            /**
1015:             * Commit a transaction.
1016:             * <p>
1017:             * Calls the implementation specific commitTransaction()
1018:             *
1019:             * @param tm The transaction marker metadata
1020:             * @param emptyTransaction true if we are committing a transaction that did
1021:             *          not execute any query
1022:             * @param dmsg distributed message which triggered this operation
1023:             * @throws SQLException if an error occurs
1024:             * @see #commitTransaction(long)
1025:             */
1026:            public final void commit(TransactionMetaData tm,
1027:                    boolean emptyTransaction, DistributedCommit dmsg)
1028:                    throws SQLException {
1029:                // Check if writes are suspended
1030:                synchronized (writesSync) {
1031:                    if (!activeTransactions.contains(tm))
1032:                        throw new SQLException("Transaction "
1033:                                + tm.getTransactionId()
1034:                                + " is not active, rejecting the commit.");
1035:
1036:                    // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1037:                    if (false) // never suspend a commit
1038:                    {
1039:                        addSuspendedRequest(dmsg);
1040:                        try {
1041:                            // Wait on writesSync
1042:                            long timeout = tm.getTimeout();
1043:                            if (timeout > 0) {
1044:                                long start = System.currentTimeMillis();
1045:                                writesSync.wait(timeout);
1046:                                long end = System.currentTimeMillis();
1047:                                long remaining = timeout - (end - start);
1048:                                if (remaining > 0)
1049:                                    tm.setTimeout(remaining);
1050:                                else {
1051:                                    String msg = Translate
1052:                                            .get(
1053:                                                    "scheduler.commit.timeout.writesSync",
1054:                                                    pendingWrites);
1055:                                    logger.warn(msg);
1056:                                    throw new SQLException(msg);
1057:                                }
1058:                            } else
1059:                                writesSync.wait();
1060:                        } catch (InterruptedException e) {
1061:                            String msg = Translate.get(
1062:                                    "scheduler.commit.timeout.writesSync",
1063:                                    pendingWrites)
1064:                                    + " (" + e + ")";
1065:                            logger.error(msg);
1066:                            throw new SQLException(msg);
1067:                        }
1068:                    }
1069:                    pendingWrites++;
1070:
1071:                    if (logger.isDebugEnabled())
1072:                        logger
1073:                                .debug("Commit scheduled - current pending writes: "
1074:                                        + pendingWrites);
1075:                }
1076:                if (!emptyTransaction)
1077:                    commitTransaction(tm.getTransactionId());
1078:            }
1079:
1080:            /**
1081:             * Commit a transaction given its id.
1082:             *
1083:             * @param transactionId the transaction id
1084:             */
1085:            protected abstract void commitTransaction(long transactionId);
1086:
1087:            /**
1088:             * Notify the completion of a commit command.
1089:             *
1090:             * @param tm The transaction marker metadata
1091:             * @param isSuccess true if commit was successful, false otherwise
1092:             */
1093:            public final void commitCompleted(TransactionMetaData tm,
1094:                    boolean isSuccess) {
1095:                boolean transactionIsActive = false;
1096:                synchronized (writesSync) {
1097:                    if (isSuccess) {
1098:                        transactionIsActive = activeTransactions.remove(tm);
1099:                    }
1100:                }
1101:                if (transactionIsActive) {
1102:                    // Take care of suspended transactions
1103:                    synchronized (transactionsSync) {
1104:                        pendingTransactions--;
1105:                        if (pendingTransactions < 0) {
1106:                            logger
1107:                                    .error("Negative pending transactions detected on commit completion for transaction "
1108:                                            + tm.getTransactionId());
1109:                            pendingTransactions = 0;
1110:                        }
1111:
1112:                        if (logger.isDebugEnabled())
1113:                            logger
1114:                                    .debug("Commit completed, remaining pending transactions: "
1115:                                            + pendingTransactions);
1116:
1117:                        checkPendingTransactions();
1118:                    }
1119:                } else if ((isSuccess) && (logger.isDebugEnabled()))
1120:                    logger.debug("Transaction " + tm.getTransactionId()
1121:                            + " has already completed.");
1122:
1123:                // Take care of suspended write
1124:                synchronized (writesSync) {
1125:                    pendingWrites--;
1126:                    if (pendingWrites < 0) {
1127:                        logger
1128:                                .error("Negative pending writes detected on commit completion for transaction"
1129:                                        + tm.getTransactionId());
1130:                        pendingWrites = 0;
1131:                    }
1132:
1133:                    if (logger.isDebugEnabled())
1134:                        logger
1135:                                .debug("Commit completed, remaining pending writes: "
1136:                                        + pendingWrites);
1137:
1138:                    checkPendingWrites();
1139:                }
1140:            }
1141:
1142:            /**
1143:             * Rollback a transaction.
1144:             * <p>
1145:             * Calls the implementation specific rollbackTransaction()
1146:             *
1147:             * @param tm The transaction marker metadata
1148:             * @param dmsg distributed message which triggered this operation
1149:             * @exception SQLException if an error occurs
1150:             * @see #rollbackTransaction(long)
1151:             */
1152:            public final void rollback(TransactionMetaData tm,
1153:                    DistributedRollback dmsg) throws SQLException {
1154:                // Check if writes are suspended
1155:                synchronized (writesSync) {
1156:                    if (!activeTransactions.contains(tm))
1157:                        throw new SQLException("Transaction "
1158:                                + tm.getTransactionId()
1159:                                + " is not active, rejecting the rollback.");
1160:
1161:                    // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1162:                    if (false) // never suspend a rollback
1163:                    {
1164:                        addSuspendedRequest(dmsg);
1165:                        try {
1166:                            // Wait on writesSync
1167:                            long timeout = tm.getTimeout();
1168:                            if (timeout > 0) {
1169:                                long start = System.currentTimeMillis();
1170:                                writesSync.wait(timeout);
1171:                                long end = System.currentTimeMillis();
1172:                                long remaining = timeout - (end - start);
1173:                                if (remaining > 0)
1174:                                    tm.setTimeout(remaining);
1175:                                else {
1176:                                    String msg = Translate
1177:                                            .get(
1178:                                                    "scheduler.rollback.timeout.writesSync",
1179:                                                    pendingWrites);
1180:                                    logger.warn(msg);
1181:                                    throw new SQLException(msg);
1182:                                }
1183:                            } else
1184:                                writesSync.wait();
1185:                        } catch (InterruptedException e) {
1186:                            String msg = Translate.get(
1187:                                    "scheduler.rollback.timeout.writesSync",
1188:                                    pendingWrites)
1189:                                    + " (" + e + ")";
1190:                            logger.error(msg);
1191:                            throw new SQLException(msg);
1192:                        }
1193:                    }
1194:                    pendingWrites++;
1195:
1196:                    if (logger.isDebugEnabled())
1197:                        logger
1198:                                .debug("Rollback scheduled - current pending writes: "
1199:                                        + pendingWrites);
1200:                }
1201:                rollbackTransaction(tm.getTransactionId());
1202:            }
1203:
1204:            /**
1205:             * Rollback a transaction to a savepoint.
1206:             * <p>
1207:             * Calls the implementation specific rollbackTransaction()
1208:             *
1209:             * @param tm transaction marker metadata
1210:             * @param savepointName name of the savepoint
1211:             * @param dmsg distributed message which triggered this operation
1212:             * @throws SQLException if an error occurs
1213:             */
1214:            public final void rollback(TransactionMetaData tm,
1215:                    String savepointName, DistributedRollbackToSavepoint dmsg)
1216:                    throws SQLException {
1217:                // Check if writes are suspended
1218:                synchronized (writesSync) {
1219:                    // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1220:                    if (false) // never suspend a rollback
1221:                    {
1222:                        addSuspendedRequest(dmsg);
1223:                        try {
1224:                            // Wait on writesSync
1225:                            long timeout = tm.getTimeout();
1226:                            if (timeout > 0) {
1227:                                long start = System.currentTimeMillis();
1228:                                writesSync.wait(timeout);
1229:                                long end = System.currentTimeMillis();
1230:                                long remaining = timeout - (end - start);
1231:                                if (remaining > 0)
1232:                                    tm.setTimeout(remaining);
1233:                                else {
1234:                                    String msg = Translate
1235:                                            .get(
1236:                                                    "scheduler.rollbacksavepoint.timeout.writeSync",
1237:                                                    pendingWrites);
1238:                                    logger.warn(msg);
1239:                                    throw new SQLException(msg);
1240:                                }
1241:                            } else
1242:                                writesSync.wait();
1243:                        } catch (InterruptedException e) {
1244:                            String msg = Translate
1245:                                    .get(
1246:                                            "scheduler.rollbacksavepoint.timeout.writeSync",
1247:                                            pendingWrites)
1248:                                    + " (" + e + ")";
1249:                            logger.error(msg);
1250:                            throw new SQLException(msg);
1251:                        }
1252:                    }
1253:                    pendingWrites++;
1254:
1255:                    if (logger.isDebugEnabled())
1256:                        logger.debug("Rollback " + savepointName
1257:                                + " scheduled - current pending writes: "
1258:                                + pendingWrites);
1259:                }
1260:
1261:                this .rollbackTransaction(tm.getTransactionId(), savepointName);
1262:            }
1263:
1264:            /**
1265:             * Rollback a transaction given its id.
1266:             *
1267:             * @param transactionId the transaction id
1268:             */
1269:            protected abstract void rollbackTransaction(long transactionId);
1270:
1271:            /**
1272:             * Rollback a transaction given its id to a savepoint given its name.
1273:             *
1274:             * @param transactionId the transaction id
1275:             * @param savepointName the name of the savepoint
1276:             */
1277:            protected abstract void rollbackTransaction(long transactionId,
1278:                    String savepointName);
1279:
1280:            /**
1281:             * Notify the completion of a rollback command.
1282:             *
1283:             * @param tm The transaction marker metadata
1284:             * @param isSuccess true if commit was successful, false otherwise
1285:             */
1286:            public final void rollbackCompleted(TransactionMetaData tm,
1287:                    boolean isSuccess) {
1288:                boolean transactionIsActive = false;
1289:                synchronized (writesSync) {
1290:                    if (isSuccess) {
1291:                        transactionIsActive = activeTransactions.remove(tm);
1292:                    }
1293:                }
1294:                if (transactionIsActive) {
1295:                    // Take care of suspended transactions
1296:                    synchronized (transactionsSync) {
1297:                        pendingTransactions--;
1298:                        if (pendingTransactions < 0) {
1299:                            logger
1300:                                    .error("Negative pending transactions detected on rollback completion for transaction "
1301:                                            + tm.getTransactionId());
1302:                            pendingTransactions = 0;
1303:                        }
1304:
1305:                        if (logger.isDebugEnabled())
1306:                            logger
1307:                                    .debug("Rollback completed, remaining pending transactions: "
1308:                                            + pendingTransactions);
1309:
1310:                        checkPendingTransactions();
1311:                    }
1312:                } else if ((isSuccess) && (logger.isDebugEnabled()))
1313:                    logger.debug("Transaction " + tm.getTransactionId()
1314:                            + " has already completed.");
1315:
1316:                // Take care of suspended write
1317:                synchronized (writesSync) {
1318:                    pendingWrites--;
1319:
1320:                    if (pendingWrites < 0) {
1321:                        logger
1322:                                .error("Negative pending writes detected on rollback completion for transaction "
1323:                                        + tm.getTransactionId());
1324:                        pendingWrites = 0;
1325:                    }
1326:
1327:                    if (logger.isDebugEnabled())
1328:                        logger
1329:                                .debug("Rollback completed, remaining pending writes: "
1330:                                        + pendingWrites);
1331:
1332:                    checkPendingWrites();
1333:                }
1334:            }
1335:
1336:            /**
1337:             * Set an unnamed savepoint.
1338:             * <p>
1339:             * Calls the implementation specific setSavepointTransaction()
1340:             *
1341:             * @param tm transaction marker metadata
1342:             * @return savepoint Id
1343:             * @throws SQLException if an error occurs
1344:             */
1345:            public final int setSavepoint(TransactionMetaData tm)
1346:                    throws SQLException {
1347:                // Check if writes are suspended
1348:                synchronized (writesSync) {
1349:                    if (suspendWrites > 0) {
1350:                        try {
1351:                            // Wait on writesSync
1352:                            long timeout = tm.getTimeout();
1353:                            if (timeout > 0) {
1354:                                long start = System.currentTimeMillis();
1355:                                writesSync.wait(timeout);
1356:                                long end = System.currentTimeMillis();
1357:                                long remaining = timeout - (end - start);
1358:                                if (remaining > 0)
1359:                                    tm.setTimeout(remaining);
1360:                                else {
1361:                                    String msg = Translate
1362:                                            .get(
1363:                                                    "scheduler.setsavepoint.timeout.writeSync",
1364:                                                    pendingWrites);
1365:                                    logger.warn(msg);
1366:                                    throw new SQLException(msg);
1367:                                }
1368:                            } else
1369:                                writesSync.wait();
1370:                        } catch (InterruptedException e) {
1371:                            String msg = Translate.get(
1372:                                    "scheduler.setsavepoint.timeout.writeSync",
1373:                                    pendingWrites)
1374:                                    + " (" + e + ")";
1375:                            logger.error(msg);
1376:                            throw new SQLException(msg);
1377:                        }
1378:                    }
1379:                    pendingWrites++;
1380:
1381:                    if (logger.isDebugEnabled())
1382:                        logger
1383:                                .debug("Set savepoint scheduled - current pending writes: "
1384:                                        + pendingWrites);
1385:                }
1386:
1387:                int savepointId = this .incrementSavepointId();
1388:                this .setSavepointTransaction(tm.getTransactionId(), String
1389:                        .valueOf(savepointId));
1390:                return savepointId;
1391:            }
1392:
1393:            /**
1394:             * Set a named savepoint.
1395:             * <p>
1396:             * Calls the implementation specific setSavepointTransaction()
1397:             *
1398:             * @param tm transaction marker metadata
1399:             * @param name name of the savepoint
1400:             * @param dmsg distributed message which triggered this operation
1401:             * @throws SQLException if an error occurs
1402:             */
1403:            public final void setSavepoint(TransactionMetaData tm, String name,
1404:                    DistributedSetSavepoint dmsg) throws SQLException {
1405:                // Check if writes are suspended
1406:                synchronized (writesSync) {
1407:                    if (suspendWrites > 0) {
1408:                        addSuspendedRequest(dmsg);
1409:                        try {
1410:                            // Wait on writesSync
1411:                            long timeout = tm.getTimeout();
1412:                            if (timeout > 0) {
1413:                                long start = System.currentTimeMillis();
1414:                                writesSync.wait(timeout);
1415:                                long end = System.currentTimeMillis();
1416:                                long remaining = timeout - (end - start);
1417:                                if (remaining > 0)
1418:                                    tm.setTimeout(remaining);
1419:                                else {
1420:                                    String msg = Translate
1421:                                            .get(
1422:                                                    "scheduler.setsavepoint.timeout.writeSync",
1423:                                                    pendingWrites);
1424:                                    logger.warn(msg);
1425:                                    throw new SQLException(msg);
1426:                                }
1427:                            } else
1428:                                writesSync.wait();
1429:                        } catch (InterruptedException e) {
1430:                            String msg = Translate.get(
1431:                                    "scheduler.setsavepoint.timeout.writeSync",
1432:                                    pendingWrites)
1433:                                    + " (" + e + ")";
1434:                            logger.error(msg);
1435:                            throw new SQLException(msg);
1436:                        }
1437:                    }
1438:                    pendingWrites++;
1439:
1440:                    if (logger.isDebugEnabled())
1441:                        logger.debug("Set savepoint " + name
1442:                                + " scheduled - current pending writes: "
1443:                                + pendingWrites);
1444:                }
1445:
1446:                this .setSavepointTransaction(tm.getTransactionId(), name);
1447:            }
1448:
1449:            /**
1450:             * Set a savepoint given its name to a transaction given its id.
1451:             *
1452:             * @param transactionId the transaction id
1453:             * @param name the name of the savepoint
1454:             */
1455:            protected abstract void setSavepointTransaction(long transactionId,
1456:                    String name);
1457:
1458:            /**
1459:             * Release a savepoint.
1460:             * <p>
1461:             * Calls the implementation specific releaseSavepointTransaction()
1462:             *
1463:             * @param tm transaction marker metadata
1464:             * @param name name of the savepoint
1465:             * @param dmsg distributed message which triggered this operation
1466:             * @throws SQLException if an error occurs
1467:             */
1468:            public final void releaseSavepoint(TransactionMetaData tm,
1469:                    String name, DistributedReleaseSavepoint dmsg)
1470:                    throws SQLException {
1471:                // Check if writes are suspended
1472:                synchronized (writesSync) {
1473:                    if (suspendWrites > 0) {
1474:                        addSuspendedRequest(dmsg);
1475:                        try {
1476:                            // Wait on writesSync
1477:                            long timeout = tm.getTimeout();
1478:                            if (timeout > 0) {
1479:                                long start = System.currentTimeMillis();
1480:                                writesSync.wait(timeout);
1481:                                long end = System.currentTimeMillis();
1482:                                long remaining = timeout - (end - start);
1483:                                if (remaining > 0)
1484:                                    tm.setTimeout(remaining);
1485:                                else {
1486:                                    String msg = Translate
1487:                                            .get(
1488:                                                    "scheduler.releasesavepoint.timeout.writeSync",
1489:                                                    pendingWrites);
1490:                                    logger.warn(msg);
1491:                                    throw new SQLException(msg);
1492:                                }
1493:                            } else
1494:                                writesSync.wait();
1495:                        } catch (InterruptedException e) {
1496:                            String msg = Translate
1497:                                    .get(
1498:                                            "scheduler.releasesavepoint.timeout.writeSync",
1499:                                            pendingWrites)
1500:                                    + " (" + e + ")";
1501:                            logger.error(msg);
1502:                            throw new SQLException(msg);
1503:                        }
1504:                    }
1505:                    pendingWrites++;
1506:
1507:                    if (logger.isDebugEnabled())
1508:                        logger.debug("Release savepoint " + name
1509:                                + " scheduled - current pending writes: "
1510:                                + pendingWrites);
1511:                }
1512:
1513:                this .releaseSavepointTransaction(tm.getTransactionId(), name);
1514:            }
1515:
1516:            /**
1517:             * Release a savepoint given its name from a transaction given its id.
1518:             *
1519:             * @param transactionId the transaction id
1520:             * @param name the name of the savepoint
1521:             */
1522:            protected abstract void releaseSavepointTransaction(
1523:                    long transactionId, String name);
1524:
1525:            /**
1526:             * Notify the conpletion of a savepoint action.
1527:             *
1528:             * @param transactionId the transaction identifier
1529:             */
1530:            public final void savepointCompleted(long transactionId) {
1531:                synchronized (writesSync) {
1532:                    pendingWrites--;
1533:
1534:                    if (pendingWrites < 0) {
1535:                        logger
1536:                                .error("Negative pending writes detected on savepoint completion for transaction"
1537:                                        + transactionId);
1538:                        pendingWrites = 0;
1539:                    }
1540:
1541:                    if (logger.isDebugEnabled())
1542:                        logger
1543:                                .debug("Savepoint completed, remaining pending writes: "
1544:                                        + pendingWrites);
1545:
1546:                    checkPendingWrites();
1547:                }
1548:            }
1549:
1550:            //
1551:            // 6. Checkpoint management
1552:            //
1553:
1554:            /**
1555:             * Resume new transactions that were suspended by
1556:             * suspendNewTransactionsForCheckpoint().
1557:             *
1558:             * @see #suspendNewTransactions()
1559:             */
1560:            public final void resumeNewTransactions() {
1561:                if (logger.isDebugEnabled())
1562:                    logger.debug("Resuming new transactions");
1563:
1564:                synchronized (transactionsSync) {
1565:                    suspendTransactions--;
1566:                    if (suspendTransactions < 0) {
1567:                        suspendTransactions = 0;
1568:                        logger
1569:                                .error("Unexpected negative suspendedTransactions in AbstractScheduler.resumeNewTransactions()");
1570:                    }
1571:                    if (suspendTransactions == 0) {
1572:                        // Wake up all pending begin statements
1573:                        transactionsSync.notifyAll();
1574:                    }
1575:                }
1576:            }
1577:
1578:            /**
1579:             * Suspend all calls to begin() until until resumeWrites() is called. This
1580:             * method does not block and returns immediately. To synchronize on suspended
1581:             * writes completion, you must call waitForSuspendedWritesToComplete().
1582:             * <p>
1583:             * New transactions remain suspended until resumeNewTransactions() is called.
1584:             *
1585:             * @see #resumeNewTransactions()
1586:             * @see #waitForSuspendedTransactionsToComplete()
1587:             */
1588:            public final void suspendNewTransactions() {
1589:                if (logger.isDebugEnabled())
1590:                    logger.debug("Suspending new transactions");
1591:
1592:                synchronized (transactionsSync) {
1593:                    suspendTransactions++;
1594:                }
1595:            }
1596:
1597:            /**
1598:             * Suspend all calls to begin() until until resumeWrites() and
1599:             * resumeNewTransactions are called. This method does not block and returns
1600:             * immediately. To synchronize on suspended writes completion, you must call
1601:             * waitForSuspendedWritesToComplete(). Suspending writes and transactions is
1602:             * done atomically in order to close a window in begin().
1603:             * <p>
1604:             * New transactions remain suspended until resumeNewTransactions() and
1605:             * resumeWrites are called.
1606:             *
1607:             * @see #resumeNewTransactions()
1608:             * @see #waitForSuspendedTransactionsToComplete()
1609:             */
1610:            public void suspendNewTransactionsAndWrites() {
1611:                if (logger.isDebugEnabled())
1612:                    logger.debug("Suspending new transactions and writes");
1613:
1614:                synchronized (writesSync) {
1615:                    synchronized (transactionsSync) {
1616:                        suspendTransactions++;
1617:                        suspendWrites++;
1618:                    }
1619:                }
1620:            }
1621:
1622:            /**
1623:             * Wait for suspended transactions to complete. Returns as soon as number of
1624:             * pending transactions has reached 0.
1625:             *
1626:             * @throws SQLException if an error occured during wait
1627:             */
1628:            public void waitForSuspendedTransactionsToComplete()
1629:                    throws SQLException {
1630:                synchronized (transactionsSync) {
1631:                    if (pendingTransactions == 0) {
1632:                        if (logger.isDebugEnabled())
1633:                            logger.debug("All transactions suspended");
1634:                        return;
1635:                    }
1636:                }
1637:
1638:                // Wait for pending transactions to end
1639:                boolean checkForTimeout = waitForSuspendedTransactionsTimeout > 0;
1640:                long waitTime = INITIAL_WAIT_TIME;
1641:                long totalWaitTime = 0;
1642:                long start;
1643:                long realWait;
1644:                while (true) {
1645:                    synchronized (endOfCurrentTransactions) {
1646:                        // Here we have a potential synchronization problem since the last
1647:                        // transaction completion could have happened before we entered this
1648:                        // synchronized block. Therefore we recheck if there is effectively
1649:                        // still pending transactions. If this is not the case, we don't have
1650:                        // to sleep and we can immediately return.
1651:                        if (pendingTransactions == 0) {
1652:                            if (logger.isDebugEnabled())
1653:                                logger.debug("All new transactions suspended");
1654:                            return;
1655:                        }
1656:
1657:                        if (logger.isDebugEnabled())
1658:                            logger.debug("Waiting for " + pendingTransactions
1659:                                    + " transactions to complete.");
1660:
1661:                        try {
1662:                            start = System.currentTimeMillis();
1663:                            endOfCurrentTransactions.wait(waitTime);
1664:                            realWait = System.currentTimeMillis() - start;
1665:                            totalWaitTime += realWait;
1666:                        } catch (InterruptedException e) {
1667:                            String msg = Translate.get(
1668:                                    "scheduler.suspend.transaction.failed", e);
1669:                            logger.error(msg);
1670:                            throw new SQLException(msg);
1671:                        }
1672:                    }
1673:                    synchronized (transactionsSync) {
1674:                        if (pendingTransactions == 0) {
1675:                            checkForTimeout = false;
1676:                            break;
1677:                        }
1678:                        if (logger.isWarnEnabled()
1679:                                && (activeTransactions.size() > 0)) {
1680:                            StringBuffer transactions = new StringBuffer("[");
1681:                            for (Iterator iter = activeTransactions.iterator(); iter
1682:                                    .hasNext();)
1683:                                transactions
1684:                                        .append((transactions.length() > 1 ? ", "
1685:                                                : "")
1686:                                                + ((TransactionMetaData) iter
1687:                                                        .next())
1688:                                                        .getTransactionId());
1689:                            transactions.append("]");
1690:                            logger.warn("Waited for "
1691:                                    + Math.round((totalWaitTime / 1000.0))
1692:                                    + " secs but " + activeTransactions.size()
1693:                                    + " transactions still open: "
1694:                                    + transactions);
1695:                            if (checkForTimeout)
1696:                                logger
1697:                                        .warn("Will wait for "
1698:                                                + Math
1699:                                                        .max(
1700:                                                                0,
1701:                                                                Math
1702:                                                                        .round((waitForSuspendedTransactionsTimeout - totalWaitTime) / 1000.0))
1703:                                                + " secs more and attempt to abort them");
1704:                        }
1705:                        if (checkForTimeout
1706:                                && totalWaitTime >= waitForSuspendedTransactionsTimeout)
1707:                            break;
1708:                        waitTime *= 2;
1709:                        if (checkForTimeout)
1710:                            waitTime = Math.min(waitTime,
1711:                                    waitForSuspendedTransactionsTimeout
1712:                                            - totalWaitTime);
1713:                    }
1714:                }
1715:                if (checkForTimeout
1716:                        && totalWaitTime >= waitForSuspendedTransactionsTimeout) {
1717:                    if (logger.isWarnEnabled())
1718:                        logger
1719:                                .warn("Timeout reached ("
1720:                                        + Math
1721:                                                .round(waitForSuspendedTransactionsTimeout / 1000.0)
1722:                                        + " secs), aborting remaining active transactions");
1723:                    abortRemainingActiveTransactions();
1724:                }
1725:                if (logger.isDebugEnabled())
1726:                    logger.debug("All new transactions suspended");
1727:            }
1728:
1729:            /**
1730:             * Resume the execution of the <em>new write queries</em> that were
1731:             * suspended by <code>suspendNewWrites()</code>.
1732:             *
1733:             * @see #suspendNewWrites()
1734:             */
1735:            public void resumeWrites() {
1736:                if (logger.isDebugEnabled())
1737:                    logger.debug("Resuming writes");
1738:
1739:                synchronized (writesSync) {
1740:                    suspendWrites--;
1741:                    if (suspendWrites < 0) {
1742:                        suspendWrites = 0;
1743:                        logger
1744:                                .error("Unexpected negative suspendedWrites in AbstractScheduler.resumeWrites()");
1745:                    }
1746:                    if (suspendWrites == 0) {
1747:                        // Wake up all waiting writes
1748:                        writesSync.notifyAll();
1749:                    }
1750:                }
1751:            }
1752:
1753:            /**
1754:             * Checks if the write queries are suspended and there is no remaining pending
1755:             * writes. In that case, notify <code>endOcCurrentWrites</code>
1756:             */
1757:            private void checkPendingWrites() {
1758:                synchronized (writesSync) {
1759:                    // If this is the last write to complete and writes are
1760:                    // suspended we have to notify suspendedWrites()
1761:                    if ((suspendWrites > 0) && (pendingWrites == 0)) {
1762:                        synchronized (endOfCurrentWrites) {
1763:                            endOfCurrentWrites.notifyAll();
1764:                        }
1765:                    }
1766:                }
1767:            }
1768:
1769:            /**
1770:             * Checks if the transactions are suspended and that there is no remaining
1771:             * pending transactions. In that case, notify
1772:             * <code>endOfCurrentTransactions</code>
1773:             *
1774:             * @see #suspendNewTransactions()
1775:             */
1776:            private void checkPendingTransactions() {
1777:                synchronized (transactionsSync) {
1778:                    // If it is the last pending transaction to complete and we
1779:                    // are waiting for pending transactions to complete, then wake
1780:                    // up suspendNewTransactionsForCheckpoint()
1781:                    if ((suspendTransactions > 0) && (pendingTransactions == 0)) {
1782:                        synchronized (endOfCurrentTransactions) {
1783:                            endOfCurrentTransactions.notifyAll();
1784:                        }
1785:                    }
1786:                }
1787:            }
1788:
1789:            /**
1790:             * Resume suspended writes, transactions and persistent connections (in this
1791:             * order).
1792:             */
1793:            public void resumeWritesTransactionsAndPersistentConnections() {
1794:                clearSuspendedRequests();
1795:                resumeWrites();
1796:                resumeNewTransactions();
1797:                resumeNewPersistentConnections();
1798:            }
1799:
1800:            /**
1801:             * Suspend all <em>new write queries</em> until resumeWrites() is called.
1802:             * This method does not block and returns immediately. To synchronize on
1803:             * suspended writes completion, you must call
1804:             * waitForSuspendedWritesToComplete().
1805:             *
1806:             * @see #resumeWrites()
1807:             * @see #waitForSuspendedWritesToComplete()
1808:             */
1809:            public void suspendNewWrites() {
1810:                if (logger.isDebugEnabled())
1811:                    logger.debug("Suspending new writes");
1812:
1813:                synchronized (writesSync) {
1814:                    suspendWrites++;
1815:                }
1816:            }
1817:
1818:            /**
1819:             * @return Returns the suspendedWrites.
1820:             */
1821:            public boolean isSuspendedWrites() {
1822:                return suspendWrites > 0;
1823:            }
1824:
1825:            /**
1826:             * Wait for suspended writes to complete. Returns as soon as number of pending
1827:             * writes has reached 0.
1828:             *
1829:             * @throws SQLException if an error occured during wait
1830:             */
1831:            public void waitForSuspendedWritesToComplete() throws SQLException {
1832:                synchronized (writesSync) {
1833:                    if (pendingWrites == 0) {
1834:                        if (logger.isDebugEnabled())
1835:                            logger.debug("All writes suspended");
1836:                        return;
1837:                    }
1838:                }
1839:
1840:                long waitTime = INITIAL_WAIT_TIME;
1841:                while (true) {
1842:                    synchronized (endOfCurrentWrites) {
1843:                        // Here we have a potential synchronization problem since the last
1844:                        // write completion could have happened before we entered this
1845:                        // synchronized block. Therefore we recheck if there is effectively
1846:                        // still pending writes. If this is not the case, we don't have
1847:                        // to sleep and we can immediately return.
1848:                        if (pendingWrites == 0) {
1849:                            if (logger.isDebugEnabled())
1850:                                logger.debug("All writes suspended");
1851:                            return;
1852:                        }
1853:
1854:                        if (logger.isDebugEnabled())
1855:                            logger.debug("Wait for " + pendingWrites
1856:                                    + " writes to complete.");
1857:
1858:                        // Wait for pending writes to end
1859:                        try {
1860:                            endOfCurrentWrites.wait(waitTime);
1861:                        } catch (InterruptedException e) {
1862:                            String msg = Translate.get(
1863:                                    "scheduler.suspend.writes.failed", e);
1864:                            logger.error(msg);
1865:                            throw new SQLException(msg);
1866:                        }
1867:                    }
1868:                    synchronized (writesSync) {
1869:                        if (pendingWrites == 0)
1870:                            break;
1871:                        else {
1872:                            logger.warn("Waiting for " + pendingWrites
1873:                                    + " pending writes");
1874:                            waitTime *= 2;
1875:                        }
1876:                    }
1877:                }
1878:
1879:                if (logger.isDebugEnabled())
1880:                    logger.debug("All writes suspended");
1881:            }
1882:
1883:            /**
1884:             * Resumes openinh and closing of persistent connections.
1885:             */
1886:            public void resumeOpenClosePersistentConnection() {
1887:                synchronized (suspendOpenClosePersistentConnectionSync) {
1888:                    suspendOpenClosePersistentConnections--;
1889:                    if (suspendOpenClosePersistentConnections == 0)
1890:                        suspendOpenClosePersistentConnectionSync.notifyAll();
1891:                }
1892:            }
1893:
1894:            /**
1895:             * Resume new persistent connections creations that were suspended by
1896:             * suspendNewPersistentConnections().
1897:             *
1898:             * @see #suspendNewPersistentConnections()
1899:             */
1900:            public final void resumeNewPersistentConnections() {
1901:                if (logger.isDebugEnabled())
1902:                    logger.debug("Resuming new persistent connections");
1903:
1904:                synchronized (persistentConnectionsSync) {
1905:                    suspendNewPersistentConnections--;
1906:                    if (suspendNewPersistentConnections < 0) {
1907:                        suspendNewPersistentConnections = 0;
1908:                        logger
1909:                                .error("Unexpected negative suspendedPersistentConnections in AbstractScheduler.resumeNewPersistentConnections()");
1910:                    }
1911:                    if (suspendNewPersistentConnections == 0) {
1912:                        // Wake up all pending persistent connections creation
1913:                        persistentConnectionsSync.notifyAll();
1914:                    }
1915:                }
1916:            }
1917:
1918:            /**
1919:             * Suspends open and closing of persistent connections.
1920:             *
1921:             * @see org.continuent.sequoia.controller.requestmanager.RequestManager#closePersistentConnection(String,
1922:             *      long)
1923:             */
1924:            public void suspendOpenClosePersistentConnection() {
1925:                synchronized (suspendOpenClosePersistentConnectionSync) {
1926:                    suspendOpenClosePersistentConnections++;
1927:                }
1928:            }
1929:
1930:            /**
1931:             * Suspend all new persistent connections creation. This method does not block
1932:             * and returns immediately. New connections remain suspended until
1933:             * resumeNewPersistentConnections() is called.
1934:             *
1935:             * @see #resumeNewPersistentConnections()
1936:             * @see #waitForSuspendedPersistentConnectionsToComplete()
1937:             */
1938:            public void suspendNewPersistentConnections() {
1939:                if (logger.isDebugEnabled())
1940:                    logger.debug("Suspending new persistent connections");
1941:
1942:                synchronized (persistentConnectionsSync) {
1943:                    suspendNewPersistentConnections++;
1944:                }
1945:            }
1946:
1947:            /**
1948:             * Wait for opened persistent connections to complete. Returns as soon as
1949:             * number of pending persistent connections has reached 0.
1950:             *
1951:             * @throws SQLException if an error occured during wait
1952:             */
1953:            public void waitForPersistentConnectionsToComplete()
1954:                    throws SQLException {
1955:                synchronized (persistentConnectionsSync) {
1956:                    if (activePersistentConnections.isEmpty()) {
1957:                        if (logger.isDebugEnabled())
1958:                            logger.debug("All persistent connections closed");
1959:                        return;
1960:                    }
1961:                }
1962:
1963:                // Wait for persistent connections to end
1964:                boolean checkForTimeout = waitForPersistentConnectionsTimeout > 0;
1965:                long totalWaitTime = 0;
1966:                synchronized (endOfCurrentPersistentConnections) {
1967:                    if (activePersistentConnections.isEmpty()) {
1968:                        if (logger.isDebugEnabled())
1969:                            logger.debug("All persistent connections closed");
1970:                        return;
1971:                    }
1972:
1973:                    if (logger.isDebugEnabled())
1974:                        logger.debug("Waiting for "
1975:                                + activePersistentConnections.size()
1976:                                + " persistent connections to be closed.");
1977:
1978:                    long waitTime = INITIAL_WAIT_TIME;
1979:                    long start;
1980:                    long realWait;
1981:                    while (!activePersistentConnections.isEmpty())
1982:                        try {
1983:                            start = System.currentTimeMillis();
1984:                            endOfCurrentPersistentConnections.wait(waitTime);
1985:                            realWait = System.currentTimeMillis() - start;
1986:                            totalWaitTime += realWait;
1987:                            if (logger.isWarnEnabled()
1988:                                    && (activePersistentConnections.size() > 0)) {
1989:                                logger
1990:                                        .warn("Waited for "
1991:                                                + Math
1992:                                                        .round((totalWaitTime / 1000.0))
1993:                                                + " secs but "
1994:                                                + activePersistentConnections
1995:                                                        .size()
1996:                                                + " persistent connections still open: "
1997:                                                + activePersistentConnections
1998:                                                        .keySet());
1999:                                if (checkForTimeout)
2000:                                    logger
2001:                                            .warn("Will wait for "
2002:                                                    + Math
2003:                                                            .max(
2004:                                                                    0,
2005:                                                                    Math
2006:                                                                            .round((waitForPersistentConnectionsTimeout - totalWaitTime) / 1000.0))
2007:                                                    + " secs more and attempt to close them");
2008:                            }
2009:                            if (checkForTimeout
2010:                                    && totalWaitTime >= waitForPersistentConnectionsTimeout)
2011:                                break;
2012:                            waitTime *= 2;
2013:                            if (checkForTimeout)
2014:                                waitTime = Math.min(waitTime,
2015:                                        waitForPersistentConnectionsTimeout
2016:                                                - totalWaitTime);
2017:                        } catch (InterruptedException e) {
2018:                            String msg = Translate.get(
2019:                                    "scheduler.suspend.transaction.failed", e);
2020:                            logger.error(msg);
2021:                            throw new SQLException(msg);
2022:                        }
2023:                }
2024:                if (checkForTimeout
2025:                        && totalWaitTime >= waitForPersistentConnectionsTimeout) {
2026:                    if (logger.isWarnEnabled())
2027:                        logger
2028:                                .warn("Timeout reached ("
2029:                                        + Math
2030:                                                .round(waitForPersistentConnectionsTimeout / 1000.0)
2031:                                        + " secs), closing remaining active persistent connections");
2032:                    closeRemainingPersistentConnections();
2033:                }
2034:                if (logger.isDebugEnabled())
2035:                    logger.debug("All persistent connections closed");
2036:            }
2037:
2038:            /**
2039:             * Blocks until all pending open/close persistent connections operations are
2040:             * completed.
2041:             */
2042:            public void waitForPendingOpenClosePersistentConnection() {
2043:                synchronized (suspendOpenClosePersistentConnectionSync) {
2044:                    while (pendingOpenClosePersistentConnections > 0) {
2045:                        try {
2046:                            suspendOpenClosePersistentConnectionSync.wait();
2047:                        } catch (InterruptedException ignore) {
2048:                        }
2049:                    }
2050:                }
2051:            }
2052:
2053:            /**
2054:             * Adds an object to the suspended requests list.
2055:             *
2056:             * @param obj suspended request.
2057:             */
2058:            private void addSuspendedRequest(Object obj) {
2059:                synchronized (suspendedRequests) {
2060:                    suspendedRequests.add(obj);
2061:                }
2062:                if (vdb.isDistributed()) { // Distributed virtual database only
2063:                    List totalOrderQueue = vdb.getTotalOrderQueue();
2064:                    synchronized (totalOrderQueue) {
2065:                        totalOrderQueue.notifyAll();
2066:                    }
2067:                }
2068:            }
2069:
2070:            /**
2071:             * Checks if an object is in the suspended requests list.
2072:             *
2073:             * @param obj request to be checked
2074:             * @return true if the request is suspended, false otherwise
2075:             */
2076:            public boolean isSuspendedRequest(Object obj) {
2077:                synchronized (suspendedRequests) {
2078:                    return suspendedRequests.contains(obj);
2079:                }
2080:            }
2081:
2082:            /**
2083:             * Removes all objects from the uspended requests list.
2084:             */
2085:            private void clearSuspendedRequests() {
2086:                synchronized (suspendedRequests) {
2087:                    suspendedRequests.clear();
2088:                }
2089:                if (vdb.isDistributed()) { // Distributed virtual database only
2090:                    List totalOrderQueue = vdb.getTotalOrderQueue();
2091:                    synchronized (totalOrderQueue) {
2092:                        totalOrderQueue.notifyAll();
2093:                    }
2094:                }
2095:            }
2096:
2097:            //
2098:            // 7. Debug/Monitoring
2099:            //
2100:
2101:            protected abstract String getXmlImpl();
2102:
2103:            /**
2104:             * Get information about the Request Scheduler in xml format
2105:             *
2106:             * @return <code>String</code> containing information in xml
2107:             */
2108:            public String getXml() {
2109:                StringBuffer info = new StringBuffer();
2110:                info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">");
2111:                info.append(this .getXmlImpl());
2112:                info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">");
2113:                return info.toString();
2114:            }
2115:
2116:            /**
2117:             * Returns live information on the scheduler
2118:             *
2119:             * @return array of data
2120:             */
2121:            public String[] getSchedulerData() {
2122:                String[] data = new String[7];
2123:                data[0] = String.valueOf(numberRead);
2124:                data[1] = String.valueOf(numberWrite);
2125:                data[2] = String.valueOf(pendingTransactions);
2126:                data[3] = String.valueOf(pendingWrites);
2127:                data[4] = String.valueOf(numberRead + numberWrite);
2128:                data[5] = String.valueOf(suspendTransactions);
2129:                data[6] = String.valueOf(suspendWrites);
2130:                return data;
2131:            }
2132:
2133:            /**
2134:             * @return Returns the numberRead.
2135:             */
2136:            public int getNumberRead() {
2137:                return numberRead;
2138:            }
2139:
2140:            /**
2141:             * @return Returns the numberWrite.
2142:             */
2143:            public int getNumberWrite() {
2144:                return numberWrite;
2145:            }
2146:
2147:            /**
2148:             * @return Returns the pendingTransactions.
2149:             */
2150:            public int getPendingTransactions() {
2151:                return pendingTransactions;
2152:            }
2153:
2154:            private void abortRemainingActiveTransactions() throws SQLException {
2155:                List transactionsToAbort = new ArrayList();
2156:                synchronized (writesSync) {
2157:                    transactionsToAbort.addAll(activeTransactions);
2158:                }
2159:                for (Iterator iter = transactionsToAbort.iterator(); iter
2160:                        .hasNext();) {
2161:                    long transactionId = ((TransactionMetaData) iter.next())
2162:                            .getTransactionId();
2163:                    if (logger.isWarnEnabled())
2164:                        logger.warn("Aborting transaction " + transactionId);
2165:                    vdb.abort(transactionId, true, true);
2166:                }
2167:            }
2168:
2169:            private void closeRemainingPersistentConnections() {
2170:                Map persistentConnectionsToClose = new HashMap();
2171:                synchronized (endOfCurrentPersistentConnections) {
2172:                    persistentConnectionsToClose
2173:                            .putAll(activePersistentConnections);
2174:                }
2175:                for (Iterator iter = persistentConnectionsToClose.keySet()
2176:                        .iterator(); iter.hasNext();) {
2177:                    Long persistentConnectionId = (Long) iter.next();
2178:                    if (logger.isWarnEnabled())
2179:                        logger.warn("Closing persistent connection "
2180:                                + persistentConnectionId);
2181:                    vdb.closePersistentConnection(
2182:                            (String) persistentConnectionsToClose
2183:                                    .get(persistentConnectionId),
2184:                            persistentConnectionId.longValue());
2185:                }
2186:            }
2187:
2188:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.