0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Jean-Bernard van Zuylen, Peter Royal, Damian Arregui.
0023: */package org.continuent.sequoia.controller.scheduler;
0024:
0025: import java.sql.SQLException;
0026: import java.util.ArrayList;
0027: import java.util.HashMap;
0028: import java.util.HashSet;
0029: import java.util.Hashtable;
0030: import java.util.Iterator;
0031: import java.util.List;
0032: import java.util.Map;
0033: import java.util.Set;
0034:
0035: import org.continuent.sequoia.common.exceptions.RollbackException;
0036: import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException;
0037: import org.continuent.sequoia.common.i18n.Translate;
0038: import org.continuent.sequoia.common.log.Trace;
0039: import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0040: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0041: import org.continuent.sequoia.common.xml.XmlComponent;
0042: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0043: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0044: import org.continuent.sequoia.controller.requests.AbstractRequest;
0045: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0046: import org.continuent.sequoia.controller.requests.SelectRequest;
0047: import org.continuent.sequoia.controller.requests.StoredProcedure;
0048: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0049: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0050: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0051: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0052: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0053: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0054: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0055:
0056: /**
0057: * The Request Scheduler should schedule the request according to a given
0058: * policy.
0059: * <p>
0060: * The requests comes from the Request Controller and are sent later to the next
0061: * ccontroller omponents (cache and load balancer).
0062: *
0063: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0064: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0065: * </a>
0066: * @author <a href="mailto:damian.arregui@continuent.com">Damian Arregui</a>
0067: * @version 1.0
0068: */
0069: public abstract class AbstractScheduler implements XmlComponent {
0070:
0071: //
0072: // How the code is organized ?
0073: //
0074: // 1. Member variables
0075: // 2. Constructor
0076: // 3. Getter/Setter
0077: // 4. Request handling
0078: // 5. Transaction management
0079: // 6. Checkpoint management
0080: // 7. Debug/Monitoring
0081: //
0082:
0083: //
0084: // 1. Member variables
0085: //
0086:
0087: // Request handling
0088: private int suspendWrites = 0;
0089: private int pendingWrites = 0;
0090: private final Object writesSync = new Object();
0091: private final Object endOfCurrentWrites = new Object();
0092: /**
0093: * Read requests only account for SelectRequest objects (stored procedures
0094: * even with a read-only semantic will be in the write requests list).
0095: */
0096: private Map activeReadRequests = new HashMap();
0097:
0098: /**
0099: * Write requests also include stored procedures.
0100: */
0101: private Map activeWriteRequests = new HashMap();
0102: private Set suspendedRequests = new HashSet();
0103:
0104: // Transaction management
0105: private long controllerId = 0;
0106: private long transactionId = 0;
0107: private int savepointId = 0;
0108: private int suspendTransactions = 0;
0109: private int pendingTransactions = 0;
0110: private final Object transactionsSync = new Object();
0111: private final Object endOfCurrentTransactions = new Object();
0112: private List activeTransactions = new ArrayList();
0113: private long waitForSuspendedTransactionsTimeout;
0114:
0115: // Persistent connection management
0116: private int suspendNewPersistentConnections = 0;
0117: private int suspendOpenClosePersistentConnections = 0;
0118: private int pendingOpenClosePersistentConnections = 0;
0119: private final Object persistentConnectionsSync = new Object();
0120: private final Object suspendOpenClosePersistentConnectionSync = new Object();
0121: private final Object endOfCurrentPersistentConnections = new Object();
0122: private long waitForPersistentConnectionsTimeout;
0123:
0124: /**
0125: * List of persistent connections that have been created <br>
0126: * persistentConnectionId (Long) -> vLogin (String)
0127: */
0128: protected Hashtable activePersistentConnections = new Hashtable();
0129:
0130: // Monitoring values
0131: private int numberRead = 0;
0132: private int numberWrite = 0;
0133:
0134: // Other
0135: protected int raidbLevel;
0136: protected int parsingGranularity;
0137:
0138: /** Reference to the associated distributed virtual database */
0139: private VirtualDatabase vdb = null;
0140:
0141: private static final int INITIAL_WAIT_TIME = 15000;
0142: protected static Trace logger = Trace
0143: .getLogger("org.continuent.sequoia.controller.scheduler");
0144:
0145: //
0146: // 2. Constructor
0147: //
0148:
0149: /**
0150: * Default scheduler to assign scheduler RAIDb level, needed granularity and
0151: * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
0152: *
0153: * @param raidbLevel RAIDb level of this scheduler
0154: * @param parsingGranularity Parsing granularity needed by the scheduler
0155: * @param vdb virtual database using this scheduler (needed to access its
0156: * total order queue)
0157: */
0158: public AbstractScheduler(int raidbLevel, int parsingGranularity,
0159: VirtualDatabase vdb,
0160: long waitForSuspendedTransactionsTimeout,
0161: long waitForPersistentConnectionsTimeout) {
0162: this .raidbLevel = raidbLevel;
0163: this .parsingGranularity = parsingGranularity;
0164: this .vdb = vdb;
0165: this .waitForSuspendedTransactionsTimeout = waitForSuspendedTransactionsTimeout;
0166: this .waitForPersistentConnectionsTimeout = waitForPersistentConnectionsTimeout;
0167: }
0168:
0169: /**
0170: * Default scheduler to assign scheduler RAIDb level, needed granularity and
0171: * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
0172: *
0173: * @param raidbLevel RAIDb level of this scheduler
0174: * @param parsingGranularity Parsing granularity needed by the scheduler
0175: * @deprecated This constructor is used only by unsupported scheduler
0176: * sub-classes.
0177: */
0178: public AbstractScheduler(int raidbLevel, int parsingGranularity) {
0179: this .raidbLevel = raidbLevel;
0180: this .parsingGranularity = parsingGranularity;
0181: this .waitForSuspendedTransactionsTimeout = 5 * 60 * 1000;
0182: this .waitForPersistentConnectionsTimeout = 5 * 60 * 1000;
0183: }
0184:
0185: //
0186: // 3. Getter/Setter methods
0187: //
0188:
0189: /**
0190: * Get the needed query parsing granularity.
0191: *
0192: * @return needed query parsing granularity
0193: */
0194: public final int getParsingGranularity() {
0195: return parsingGranularity;
0196: }
0197:
0198: /**
0199: * Assigns the local controllerId. It is used for generating transactionIds
0200: * for autocommit requests.
0201: *
0202: * @param controllerId for this controller
0203: */
0204: public void setControllerId(long controllerId) {
0205: this .controllerId = controllerId;
0206: }
0207:
0208: /**
0209: * Returns the RAIDbLevel.
0210: *
0211: * @return int
0212: */
0213: public final int getRAIDbLevel() {
0214: return raidbLevel;
0215: }
0216:
0217: /**
0218: * Sets the <code>DatabaseSchema</code> of the current virtual database.
0219: * This is only needed by some schedulers that will have to define their own
0220: * scheduler schema
0221: *
0222: * @param dbs a <code>DatabaseSchema</code> value
0223: * @see org.continuent.sequoia.controller.scheduler.schema.SchedulerDatabaseSchema
0224: */
0225: public void setDatabaseSchema(DatabaseSchema dbs) {
0226: if (logger.isInfoEnabled())
0227: logger.info(Translate
0228: .get("scheduler.doesnt.support.schemas"));
0229: }
0230:
0231: /**
0232: * Merge the given <code>DatabaseSchema</code> with the current one.
0233: *
0234: * @param dbs a <code>DatabaseSchema</code> value
0235: * @see org.continuent.sequoia.controller.scheduler.schema.SchedulerDatabaseSchema
0236: */
0237: public void mergeDatabaseSchema(DatabaseSchema dbs) {
0238: logger.info(Translate.get("scheduler.doesnt.support.schemas"));
0239: }
0240:
0241: //
0242: // 4. Request handling
0243: //
0244:
0245: /**
0246: * Returns the list of active read requests <request id, SelectRequest>.
0247: *
0248: * @return Returns the active read requests.
0249: */
0250: public final Map getActiveReadRequests() {
0251: return activeReadRequests;
0252: }
0253:
0254: /**
0255: * Returns the list of active write requests <request id, AbstractRequest>.
0256: * Write requests can be either StoredProcedure or AbstractWriteRequest
0257: * objects.
0258: *
0259: * @return Returns the active write requests.
0260: */
0261: public final Map getActiveWriteRequests() {
0262: return activeWriteRequests;
0263: }
0264:
0265: /**
0266: * Returns the number of pending writes.
0267: *
0268: * @return int
0269: */
0270: public final int getPendingWrites() {
0271: return pendingWrites;
0272: }
0273:
0274: /**
0275: * Returns true if the given request id is in the active request list.
0276: *
0277: * @param requestId the request unique id
0278: * @return true if the request is active, false otherwise
0279: */
0280: public boolean isActiveRequest(long requestId) {
0281: Long lId = new Long(requestId);
0282: synchronized (activeReadRequests) {
0283: if (activeReadRequests.containsKey(lId))
0284: return true;
0285: }
0286: synchronized (activeWriteRequests) {
0287: return activeWriteRequests.containsKey(lId);
0288: }
0289: }
0290:
0291: /**
0292: * Wait for the completion of the given request id. The method returns as soon
0293: * as the request completion has been notified to the scheduler.
0294: *
0295: * @param requestId the unique request identifier
0296: */
0297: public void waitForRequestCompletion(long requestId) {
0298: Long lId = new Long(requestId);
0299: synchronized (activeReadRequests) {
0300: while (activeReadRequests.containsKey(lId)) {
0301: try {
0302: activeReadRequests.wait();
0303: } catch (InterruptedException ignore) {
0304: }
0305: }
0306: }
0307: synchronized (activeWriteRequests) {
0308: while (activeWriteRequests.containsKey(lId)) {
0309: try {
0310: activeWriteRequests.wait();
0311: } catch (InterruptedException ignore) {
0312: }
0313: }
0314: }
0315: }
0316:
0317: /**
0318: * Schedule a read request
0319: *
0320: * @param request select request to schedule
0321: * @exception SQLException if a timeout occurs or a query with the same id has
0322: * already been scheduled.
0323: */
0324: public void scheduleReadRequest(SelectRequest request)
0325: throws SQLException {
0326: Long id = new Long(request.getId());
0327: synchronized (activeReadRequests) {
0328: if (activeReadRequests.containsKey(id))
0329: throw new SQLException("A query with id " + id
0330: + " has already been scheduled");
0331: activeReadRequests.put(id, request);
0332: }
0333:
0334: // Assign a unique transaction id to requests in autocommit mode as well
0335: if (request.isAutoCommit() && request.isMustBroadcast()) {
0336: long fakeTid = getNextTransactionId();
0337: fakeTid = fakeTid
0338: & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
0339: fakeTid = fakeTid | controllerId;
0340: request.setTransactionId(fakeTid);
0341: }
0342:
0343: try {
0344: scheduleNonSuspendedReadRequest(request);
0345: } catch (SQLException e) {
0346: // Remove query for the active queue if we failed to schedule
0347: synchronized (activeReadRequests) {
0348: activeReadRequests.remove(id);
0349: }
0350: throw e;
0351: }
0352: }
0353:
0354: /**
0355: * Schedule a read request (implementation specific)
0356: *
0357: * @param request Select request to schedule (SQL macros are already handled
0358: * if needed)
0359: * @exception SQLException if a timeout occurs
0360: */
0361: protected abstract void scheduleNonSuspendedReadRequest(
0362: SelectRequest request) throws SQLException;
0363:
0364: /**
0365: * Notify the completion of a read statement.
0366: *
0367: * @param request the completed request
0368: * @throws SQLException if the query was not in the list of active read
0369: * requests (not scheduled)
0370: */
0371: public final void readCompleted(SelectRequest request)
0372: throws SQLException {
0373: Long id = new Long(request.getId());
0374: synchronized (activeReadRequests) {
0375: if (activeReadRequests.remove(id) == null)
0376: throw new SQLException(
0377: "Query "
0378: + id
0379: + " is not in the list of currently scheduled queries");
0380: activeReadRequests.notifyAll();
0381: }
0382: numberRead++;
0383: this .readCompletedNotify(request);
0384: }
0385:
0386: /**
0387: * Notify the completion of a read statement.
0388: *
0389: * @param request the completed request
0390: */
0391: protected abstract void readCompletedNotify(SelectRequest request);
0392:
0393: /**
0394: * Schedule a write request. This method blocks if the writes are suspended.
0395: * Then the number of pending writes is updated and the implementation
0396: * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
0397: * are replaced in the request if the scheduler has needSQLMacroHandling set
0398: * to true.
0399: *
0400: * @param request Write request to schedule
0401: * @exception SQLException if a timeout occurs or a query with the same id has
0402: * already been scheduled.
0403: * @exception RollbackException if an error occurs
0404: * @see #scheduleNonSuspendedWriteRequest(AbstractWriteRequest)
0405: */
0406: public final void scheduleWriteRequest(AbstractWriteRequest request)
0407: throws SQLException, RollbackException {
0408: suspendWriteIfNeededAndAddQueryToActiveRequests(request);
0409: scheduleNonSuspendedWriteRequest(request);
0410:
0411: // Assign a unique transaction id to requests in autocommit mode as well
0412: if (request.isAutoCommit()) {
0413: long fakeTid = getNextTransactionId();
0414: fakeTid = fakeTid
0415: & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
0416: fakeTid = fakeTid | controllerId;
0417: request.setTransactionId(fakeTid);
0418: }
0419: }
0420:
0421: /**
0422: * Schedule a write request (implementation specific). This method blocks
0423: * until the request can be executed.
0424: *
0425: * @param request Write request to schedule (SQL macros are already handled if
0426: * needed)
0427: * @exception SQLException if a timeout occurs
0428: * @exception RollbackException if the transaction must be rollbacked
0429: */
0430: protected abstract void scheduleNonSuspendedWriteRequest(
0431: AbstractWriteRequest request) throws SQLException,
0432: RollbackException;
0433:
0434: /**
0435: * Notify the completion of a write statement.
0436: * <p>
0437: * This method updates the number of pending writes and calls the
0438: * implementation specific notifyWriteCompleted function.
0439: * <p>
0440: * Finally, the suspendWrites() function is notified if needed.
0441: *
0442: * @param request the completed request
0443: * @throws SQLException if the query is not in the list of scheduled queries
0444: * @see #notifyWriteCompleted(AbstractWriteRequest)
0445: * @see #checkPendingWrites()
0446: */
0447: public final void writeCompleted(AbstractWriteRequest request)
0448: throws SQLException {
0449: Long id = new Long(request.getId());
0450:
0451: synchronized (writesSync) {
0452: synchronized (activeWriteRequests) {
0453: if (activeWriteRequests.remove(id) == null)
0454: throw new SQLException(
0455: "Query "
0456: + id
0457: + " is not in the list of currently scheduled queries");
0458:
0459: activeWriteRequests.notifyAll();
0460: }
0461: pendingWrites--;
0462:
0463: if (pendingWrites < 0) {
0464: logger
0465: .error("Negative pending writes detected on write request completion ("
0466: + request + ")");
0467: pendingWrites = 0;
0468: }
0469:
0470: if (logger.isDebugEnabled())
0471: logger
0472: .debug("Write completed, remaining pending writes: "
0473: + pendingWrites);
0474:
0475: notifyWriteCompleted(request);
0476:
0477: checkPendingWrites();
0478: }
0479: numberWrite++;
0480: }
0481:
0482: /**
0483: * Notify the completion of a write statement. This method does not need to be
0484: * synchronized, it is enforced by the caller.
0485: *
0486: * @param request the completed request
0487: * @see #writeCompleted(AbstractWriteRequest)
0488: */
0489: protected abstract void notifyWriteCompleted(
0490: AbstractWriteRequest request);
0491:
0492: /**
0493: * Schedule a write request. This method blocks if the writes are suspended.
0494: * Then the number of pending writes is updated and the implementation
0495: * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
0496: * are replaced in the request if the scheduler has needSQLMacroHandling set
0497: * to true.
0498: *
0499: * @param proc Stored procedure to schedule
0500: * @exception SQLException if a timeout occurs
0501: * @exception RollbackException if an error occurs
0502: * @see #scheduleNonSuspendedStoredProcedure(StoredProcedure)
0503: */
0504: public final void scheduleStoredProcedure(StoredProcedure proc)
0505: throws SQLException, RollbackException {
0506: suspendWriteIfNeededAndAddQueryToActiveRequests(proc);
0507: scheduleNonSuspendedStoredProcedure(proc);
0508:
0509: // Assign a unique transaction id to requests in autocommit mode as well
0510: if (proc.isAutoCommit()) {
0511: long fakeTid = getNextTransactionId();
0512: fakeTid = fakeTid
0513: & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
0514: fakeTid = fakeTid | controllerId;
0515: proc.setTransactionId(fakeTid);
0516: }
0517: }
0518:
0519: /**
0520: * Schedule a write request (implementation specific). This method blocks
0521: * until the request can be executed.
0522: *
0523: * @param proc Stored procedure to schedule
0524: * @exception SQLException if a timeout occurs
0525: * @exception RollbackException if the transaction must be rollbacked
0526: */
0527: protected abstract void scheduleNonSuspendedStoredProcedure(
0528: StoredProcedure proc) throws SQLException,
0529: RollbackException;
0530:
0531: /**
0532: * Notify the completion of a stored procedure.
0533: * <p>
0534: * This method updates the number of pending writes and calls the
0535: * implementation specific notifyStoredProcedureCompleted function.
0536: * <p>
0537: * Finally, the suspendWrites() function is notified if needed.
0538: *
0539: * @param proc the completed stored procedure
0540: * @throws SQLException if the stored procedure was not scheduled before (not
0541: * in the active request list)
0542: * @see #notifyStoredProcedureCompleted(StoredProcedure)
0543: * @see #checkPendingWrites()
0544: */
0545: public final void storedProcedureCompleted(StoredProcedure proc)
0546: throws SQLException {
0547: Long id = new Long(proc.getId());
0548:
0549: synchronized (writesSync) {
0550: synchronized (activeWriteRequests) {
0551: if (activeWriteRequests.remove(id) == null)
0552: throw new SQLException(
0553: "Query "
0554: + id
0555: + " is not in the list of currently scheduled queries");
0556:
0557: activeWriteRequests.notifyAll();
0558: }
0559:
0560: pendingWrites--;
0561:
0562: if (pendingWrites < 0) {
0563: logger
0564: .error("Negative pending writes detected on stored procedure completion ("
0565: + proc + ")");
0566: pendingWrites = 0;
0567: }
0568:
0569: if (logger.isDebugEnabled())
0570: logger
0571: .debug("Stored procedure completed, remaining pending writes: "
0572: + pendingWrites);
0573:
0574: notifyStoredProcedureCompleted(proc);
0575:
0576: checkPendingWrites();
0577: }
0578: numberWrite++;
0579: }
0580:
0581: /**
0582: * Notify the completion of a stored procedure. This method does not need to
0583: * be synchronized, it is enforced by the caller.
0584: *
0585: * @param proc the completed stored procedure
0586: * @see #storedProcedureCompleted(StoredProcedure)
0587: */
0588: protected abstract void notifyStoredProcedureCompleted(
0589: StoredProcedure proc);
0590:
0591: /**
0592: * Suspend write requests if suspendedWrites is active. Adds the request to
0593: * the list of active requests after successful scheduling.
0594: *
0595: * @param request the request to suspend (a write request or a stored
0596: * procedure)
0597: * @throws SQLException if the request timeout has expired or a query with the
0598: * same id has already been scheduled.
0599: */
0600: private void suspendWriteIfNeededAndAddQueryToActiveRequests(
0601: AbstractRequest request) throws SQLException {
0602: Long id = new Long(request.getId());
0603:
0604: synchronized (writesSync) {
0605: if (suspendWrites > 0) {
0606: // Let requests in active transactions to execute since they might
0607: // unblock queries of other transactions.
0608: boolean mustBeSuspended = !request
0609: .isPersistentConnection()
0610: && (request.isAutoCommit() || !activeTransactions
0611: .contains(new TransactionMetaData(
0612: request.getTransactionId(),
0613: 0,
0614: request.getLogin(),
0615: request
0616: .isPersistentConnection(),
0617: request
0618: .getPersistentConnectionId())));
0619:
0620: if (mustBeSuspended) {
0621: addSuspendedRequest(request);
0622: try {
0623: // Wait on writesSync
0624: int timeout = request.getTimeout();
0625: if (timeout > 0) {
0626: long start = System.currentTimeMillis();
0627: long lTimeout = timeout * 1000L;
0628: writesSync.wait(lTimeout);
0629: long end = System.currentTimeMillis();
0630: int remaining = (int) (lTimeout - (end - start));
0631: if (remaining > 0)
0632: request.setTimeout(remaining);
0633: else {
0634: String msg = Translate
0635: .get(
0636: "scheduler.request.timeout",
0637: new String[] {
0638: String
0639: .valueOf(request
0640: .getId()),
0641: String
0642: .valueOf(request
0643: .getTimeout()),
0644: String
0645: .valueOf(pendingWrites) });
0646: logger.warn(msg);
0647: throw new SQLException(msg);
0648: }
0649: } else
0650: this .writesSync.wait();
0651: } catch (InterruptedException e) {
0652: String msg = Translate.get(
0653: "scheduler.request.timeout.failed", e);
0654: logger.warn(msg);
0655: throw new SQLException(msg);
0656: }
0657: }
0658: }
0659:
0660: synchronized (activeWriteRequests) {
0661: if (activeWriteRequests.containsKey(id))
0662: throw new SQLException("A query with id " + id
0663: + " has already been scheduled");
0664:
0665: activeWriteRequests.put(id, request);
0666: }
0667: pendingWrites++;
0668:
0669: if (logger.isDebugEnabled())
0670: logger
0671: .debug("Schedule " + request.getUniqueKey()
0672: + " - Current pending writes: "
0673: + pendingWrites);
0674: }
0675: }
0676:
0677: /**
0678: * Signals the start of a persistent connection opening operation.
0679: *
0680: * @param dmsg distributed message which triggered this operation
0681: */
0682: public void scheduleOpenPersistentConnection(
0683: DistributedOpenPersistentConnection dmsg) {
0684: checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount();
0685:
0686: // Underlying Hashtable is synchronized and we systematically overwrite
0687: // any previous value, it is as fast as checking first.
0688: // Check if persistent connections creation is suspended
0689: synchronized (persistentConnectionsSync) {
0690: if (suspendNewPersistentConnections > 0) {
0691: addSuspendedRequest(dmsg);
0692: try {
0693: persistentConnectionsSync.wait();
0694: } catch (InterruptedException e) {
0695: e.printStackTrace();
0696: }
0697: }
0698: activePersistentConnections.put(new Long(dmsg
0699: .getPersistentConnectionId()), dmsg.getLogin());
0700: }
0701: }
0702:
0703: /**
0704: * Schedule a close persistent connection.
0705: */
0706: public void scheduleClosePersistentConnection() {
0707: checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount();
0708: }
0709:
0710: private void checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount() {
0711: synchronized (suspendOpenClosePersistentConnectionSync) {
0712: while (suspendOpenClosePersistentConnections > 0) {
0713: try {
0714: suspendOpenClosePersistentConnectionSync.wait();
0715: } catch (InterruptedException e) {
0716: }
0717: }
0718: pendingOpenClosePersistentConnections++;
0719: }
0720: }
0721:
0722: private void decrementOpenClosePersistentConnectionCount() {
0723: synchronized (suspendOpenClosePersistentConnectionSync) {
0724: pendingOpenClosePersistentConnections--;
0725: if (pendingOpenClosePersistentConnections < 0) {
0726: logger
0727: .error("Negative count of pending open/close persistent connections");
0728: pendingOpenClosePersistentConnections = 0;
0729: }
0730: if (suspendOpenClosePersistentConnections == 0)
0731: suspendOpenClosePersistentConnectionSync.notifyAll();
0732: }
0733: }
0734:
0735: /**
0736: * Notify open persistent connection completion. If it failed the connection
0737: * is removed from the persistentConnections table.
0738: *
0739: * @param persistentConnectionId id of the opened persistent connection
0740: * @param success true if connection opening was successful in which case the
0741: * connection is added to the persistent connection list
0742: */
0743: public void openPersistentConnectionCompleted(
0744: long persistentConnectionId, boolean success) {
0745: decrementOpenClosePersistentConnectionCount();
0746: if (!success)
0747: synchronized (endOfCurrentPersistentConnections) {
0748: activePersistentConnections.remove(new Long(
0749: persistentConnectionId));
0750: endOfCurrentPersistentConnections.notifyAll();
0751: }
0752: }
0753:
0754: /**
0755: * Signals the completion of a persistent connection closing operation.
0756: *
0757: * @param persistentConnectionId id of the closed persistent connection
0758: */
0759: public void closePersistentConnectionCompleted(
0760: long persistentConnectionId) {
0761: decrementOpenClosePersistentConnectionCount();
0762: synchronized (endOfCurrentPersistentConnections) {
0763: activePersistentConnections.remove(new Long(
0764: persistentConnectionId));
0765: endOfCurrentPersistentConnections.notifyAll();
0766: }
0767: }
0768:
0769: /**
0770: * Returns the login associated with a given persistent connection.
0771: *
0772: * @param persistentConnectionId the id of the persistent connection
0773: * @return the associated login
0774: */
0775: public String getPersistentConnectionLogin(
0776: Long persistentConnectionId) {
0777: return (String) activePersistentConnections
0778: .get(persistentConnectionId);
0779: }
0780:
0781: /**
0782: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#hasPersistentConnection(long)
0783: */
0784: public boolean hasPersistentConnection(long persistentConnectionId) {
0785: return activePersistentConnections.contains(new Long(
0786: persistentConnectionId));
0787: }
0788:
0789: /**
0790: * Returns a hashtable of all the open persistent connections (and their
0791: * associated login).
0792: *
0793: * @return persistent connection hashtable
0794: */
0795: public Hashtable getOpenPersistentConnections() {
0796: return activePersistentConnections;
0797: }
0798:
0799: //
0800: // 5. Transaction management
0801: //
0802:
0803: /**
0804: * Returns the list of active transactions (list contains transaction ids).
0805: *
0806: * @return Returns the active transaction ids.
0807: */
0808: public final List getActiveTransactions() {
0809: return activeTransactions;
0810: }
0811:
0812: /**
0813: * Retrieve the next transaction identifier
0814: *
0815: * @return next transaction identifier
0816: */
0817: public long getNextTransactionId() {
0818: synchronized (transactionsSync) {
0819: return transactionId++;
0820: }
0821: }
0822:
0823: /**
0824: * Increments the savepoint id for un-named savepoints
0825: *
0826: * @return the next savepoint Id
0827: */
0828: public synchronized int incrementSavepointId() {
0829: savepointId++;
0830: return savepointId;
0831: }
0832:
0833: /**
0834: * Initialize the transaction id with the given value (usually retrieved from
0835: * the recovery log).
0836: *
0837: * @param transactionId new current transaction identifier
0838: */
0839: public final void initializeTransactionId(long transactionId) {
0840: synchronized (transactionsSync) {
0841: // Use the max operator as a safeguard: IDs may have been delivered but
0842: // not logged yet.
0843: this .transactionId = Math.max(this .transactionId + 1,
0844: transactionId);
0845: }
0846: }
0847:
0848: /**
0849: * Begin a new transaction with the transaction identifier provided in the
0850: * transaction meta data parameter. Note that this id must retrieve beforehand
0851: * by calling getNextTransactionId(). This method is called from the driver
0852: * when setAutoCommit(false) is called.
0853: *
0854: * @param tm The transaction marker metadata
0855: * @param isLazyStart true if this begin is triggered by a lazy transaction
0856: * start of a transaction initiated by a remote controller. In that
0857: * case, suspended transactions will be ignored (but not suspended
0858: * writes)
0859: * @param request request which triggered this operation
0860: * @throws SQLException if an error occurs
0861: */
0862: public final void begin(TransactionMetaData tm,
0863: boolean isLazyStart, AbstractRequest request)
0864: throws SQLException {
0865: // Check if transactions are suspended
0866: boolean retry;
0867: do {
0868: retry = false;
0869: synchronized (transactionsSync) {
0870: if ((suspendTransactions > 0) && !isLazyStart
0871: && !tm.isPersistentConnection()) {
0872: addSuspendedRequest(request);
0873: try {
0874: // Wait on transactionSync
0875: long timeout = tm.getTimeout();
0876: if (timeout > 0) {
0877: long start = System.currentTimeMillis();
0878: transactionsSync.wait(timeout);
0879: long end = System.currentTimeMillis();
0880: long remaining = timeout - (end - start);
0881: if (remaining > 0)
0882: tm.setTimeout(remaining);
0883: else {
0884: String msg = Translate
0885: .get(
0886: "scheduler.begin.timeout.transactionSync",
0887: pendingTransactions);
0888: logger.warn(msg);
0889: throw new SQLException(msg);
0890: }
0891: } else
0892: transactionsSync.wait();
0893: } catch (InterruptedException e) {
0894: String msg = Translate
0895: .get(
0896: "scheduler.begin.timeout.transactionSync",
0897: pendingTransactions)
0898: + " (" + e + ")";
0899: logger.error(msg);
0900: throw new SQLException(msg);
0901: }
0902: }
0903: if (vdb != null && vdb.isRejectingNewTransaction())
0904: throw new VDBisShuttingDownException(
0905: "VDB is shutting down... can't start a new transaction");
0906:
0907: pendingTransactions++;
0908:
0909: if (logger.isDebugEnabled())
0910: logger
0911: .debug("Begin scheduled - current pending transactions: "
0912: + pendingTransactions);
0913: }
0914:
0915: // Check if writes are suspended
0916: synchronized (writesSync) {
0917: /*
0918: * If suspendedTransaction changed after we left the block above, we
0919: * need to go back and wait there.
0920: */
0921: synchronized (transactionsSync) {
0922: if ((suspendTransactions > 0) && !isLazyStart
0923: && !tm.isPersistentConnection()) {
0924: retry = true;
0925: pendingTransactions--;
0926: checkPendingTransactions();
0927: continue;
0928: }
0929: }
0930: if ((suspendWrites > 0) && !isLazyStart
0931: && !tm.isPersistentConnection()) {
0932: addSuspendedRequest(request);
0933: try {
0934: // Wait on writesSync
0935: long timeout = tm.getTimeout();
0936: if (timeout > 0) {
0937: long start = System.currentTimeMillis();
0938: writesSync.wait(timeout);
0939: long end = System.currentTimeMillis();
0940: long remaining = timeout - (end - start);
0941: if (remaining > 0)
0942: tm.setTimeout(remaining);
0943: else {
0944: String msg = Translate
0945: .get(
0946: "scheduler.begin.timeout.writesSync",
0947: pendingWrites);
0948: logger.warn(msg);
0949: synchronized (transactionsSync) {
0950: pendingTransactions--;
0951: }
0952: checkPendingTransactions();
0953: throw new SQLException(msg);
0954: }
0955: } else
0956: writesSync.wait();
0957: } catch (InterruptedException e) {
0958: String msg = Translate.get(
0959: "scheduler.begin.timeout.writesSync",
0960: pendingWrites)
0961: + " (" + e + ")";
0962: logger.error(msg);
0963: synchronized (transactionsSync) {
0964: pendingTransactions--;
0965: }
0966: checkPendingTransactions();
0967: throw new SQLException(msg);
0968: }
0969: }
0970: pendingWrites++;
0971:
0972: if (logger.isDebugEnabled())
0973: logger
0974: .debug("Begin scheduled - current pending writes: "
0975: + pendingWrites);
0976:
0977: // Check if the transaction has not already been started and add it to
0978: // the
0979: // active transaction list
0980: if (activeTransactions.contains(tm)) {
0981: logger.error("Trying to start twice transaction "
0982: + tm.getTransactionId());
0983: } else
0984: activeTransactions.add(tm);
0985: }
0986: } while (retry);
0987: }
0988:
0989: /**
0990: * Notify the completion of a begin command.
0991: *
0992: * @param transactionId of the completed begin
0993: */
0994: public final void beginCompleted(long transactionId) {
0995: // Take care of suspended write
0996: synchronized (writesSync) {
0997: pendingWrites--;
0998: if (pendingWrites < 0) {
0999: logger
1000: .error("Negative pending writes detected on begin completion for transaction "
1001: + transactionId);
1002: pendingWrites = 0;
1003: }
1004:
1005: if (logger.isDebugEnabled())
1006: logger
1007: .debug("Begin completed, remaining pending writes: "
1008: + pendingWrites);
1009:
1010: checkPendingWrites();
1011: }
1012: }
1013:
1014: /**
1015: * Commit a transaction.
1016: * <p>
1017: * Calls the implementation specific commitTransaction()
1018: *
1019: * @param tm The transaction marker metadata
1020: * @param emptyTransaction true if we are committing a transaction that did
1021: * not execute any query
1022: * @param dmsg distributed message which triggered this operation
1023: * @throws SQLException if an error occurs
1024: * @see #commitTransaction(long)
1025: */
1026: public final void commit(TransactionMetaData tm,
1027: boolean emptyTransaction, DistributedCommit dmsg)
1028: throws SQLException {
1029: // Check if writes are suspended
1030: synchronized (writesSync) {
1031: if (!activeTransactions.contains(tm))
1032: throw new SQLException("Transaction "
1033: + tm.getTransactionId()
1034: + " is not active, rejecting the commit.");
1035:
1036: // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1037: if (false) // never suspend a commit
1038: {
1039: addSuspendedRequest(dmsg);
1040: try {
1041: // Wait on writesSync
1042: long timeout = tm.getTimeout();
1043: if (timeout > 0) {
1044: long start = System.currentTimeMillis();
1045: writesSync.wait(timeout);
1046: long end = System.currentTimeMillis();
1047: long remaining = timeout - (end - start);
1048: if (remaining > 0)
1049: tm.setTimeout(remaining);
1050: else {
1051: String msg = Translate
1052: .get(
1053: "scheduler.commit.timeout.writesSync",
1054: pendingWrites);
1055: logger.warn(msg);
1056: throw new SQLException(msg);
1057: }
1058: } else
1059: writesSync.wait();
1060: } catch (InterruptedException e) {
1061: String msg = Translate.get(
1062: "scheduler.commit.timeout.writesSync",
1063: pendingWrites)
1064: + " (" + e + ")";
1065: logger.error(msg);
1066: throw new SQLException(msg);
1067: }
1068: }
1069: pendingWrites++;
1070:
1071: if (logger.isDebugEnabled())
1072: logger
1073: .debug("Commit scheduled - current pending writes: "
1074: + pendingWrites);
1075: }
1076: if (!emptyTransaction)
1077: commitTransaction(tm.getTransactionId());
1078: }
1079:
1080: /**
1081: * Commit a transaction given its id.
1082: *
1083: * @param transactionId the transaction id
1084: */
1085: protected abstract void commitTransaction(long transactionId);
1086:
1087: /**
1088: * Notify the completion of a commit command.
1089: *
1090: * @param tm The transaction marker metadata
1091: * @param isSuccess true if commit was successful, false otherwise
1092: */
1093: public final void commitCompleted(TransactionMetaData tm,
1094: boolean isSuccess) {
1095: boolean transactionIsActive = false;
1096: synchronized (writesSync) {
1097: if (isSuccess) {
1098: transactionIsActive = activeTransactions.remove(tm);
1099: }
1100: }
1101: if (transactionIsActive) {
1102: // Take care of suspended transactions
1103: synchronized (transactionsSync) {
1104: pendingTransactions--;
1105: if (pendingTransactions < 0) {
1106: logger
1107: .error("Negative pending transactions detected on commit completion for transaction "
1108: + tm.getTransactionId());
1109: pendingTransactions = 0;
1110: }
1111:
1112: if (logger.isDebugEnabled())
1113: logger
1114: .debug("Commit completed, remaining pending transactions: "
1115: + pendingTransactions);
1116:
1117: checkPendingTransactions();
1118: }
1119: } else if ((isSuccess) && (logger.isDebugEnabled()))
1120: logger.debug("Transaction " + tm.getTransactionId()
1121: + " has already completed.");
1122:
1123: // Take care of suspended write
1124: synchronized (writesSync) {
1125: pendingWrites--;
1126: if (pendingWrites < 0) {
1127: logger
1128: .error("Negative pending writes detected on commit completion for transaction"
1129: + tm.getTransactionId());
1130: pendingWrites = 0;
1131: }
1132:
1133: if (logger.isDebugEnabled())
1134: logger
1135: .debug("Commit completed, remaining pending writes: "
1136: + pendingWrites);
1137:
1138: checkPendingWrites();
1139: }
1140: }
1141:
1142: /**
1143: * Rollback a transaction.
1144: * <p>
1145: * Calls the implementation specific rollbackTransaction()
1146: *
1147: * @param tm The transaction marker metadata
1148: * @param dmsg distributed message which triggered this operation
1149: * @exception SQLException if an error occurs
1150: * @see #rollbackTransaction(long)
1151: */
1152: public final void rollback(TransactionMetaData tm,
1153: DistributedRollback dmsg) throws SQLException {
1154: // Check if writes are suspended
1155: synchronized (writesSync) {
1156: if (!activeTransactions.contains(tm))
1157: throw new SQLException("Transaction "
1158: + tm.getTransactionId()
1159: + " is not active, rejecting the rollback.");
1160:
1161: // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1162: if (false) // never suspend a rollback
1163: {
1164: addSuspendedRequest(dmsg);
1165: try {
1166: // Wait on writesSync
1167: long timeout = tm.getTimeout();
1168: if (timeout > 0) {
1169: long start = System.currentTimeMillis();
1170: writesSync.wait(timeout);
1171: long end = System.currentTimeMillis();
1172: long remaining = timeout - (end - start);
1173: if (remaining > 0)
1174: tm.setTimeout(remaining);
1175: else {
1176: String msg = Translate
1177: .get(
1178: "scheduler.rollback.timeout.writesSync",
1179: pendingWrites);
1180: logger.warn(msg);
1181: throw new SQLException(msg);
1182: }
1183: } else
1184: writesSync.wait();
1185: } catch (InterruptedException e) {
1186: String msg = Translate.get(
1187: "scheduler.rollback.timeout.writesSync",
1188: pendingWrites)
1189: + " (" + e + ")";
1190: logger.error(msg);
1191: throw new SQLException(msg);
1192: }
1193: }
1194: pendingWrites++;
1195:
1196: if (logger.isDebugEnabled())
1197: logger
1198: .debug("Rollback scheduled - current pending writes: "
1199: + pendingWrites);
1200: }
1201: rollbackTransaction(tm.getTransactionId());
1202: }
1203:
1204: /**
1205: * Rollback a transaction to a savepoint.
1206: * <p>
1207: * Calls the implementation specific rollbackTransaction()
1208: *
1209: * @param tm transaction marker metadata
1210: * @param savepointName name of the savepoint
1211: * @param dmsg distributed message which triggered this operation
1212: * @throws SQLException if an error occurs
1213: */
1214: public final void rollback(TransactionMetaData tm,
1215: String savepointName, DistributedRollbackToSavepoint dmsg)
1216: throws SQLException {
1217: // Check if writes are suspended
1218: synchronized (writesSync) {
1219: // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1220: if (false) // never suspend a rollback
1221: {
1222: addSuspendedRequest(dmsg);
1223: try {
1224: // Wait on writesSync
1225: long timeout = tm.getTimeout();
1226: if (timeout > 0) {
1227: long start = System.currentTimeMillis();
1228: writesSync.wait(timeout);
1229: long end = System.currentTimeMillis();
1230: long remaining = timeout - (end - start);
1231: if (remaining > 0)
1232: tm.setTimeout(remaining);
1233: else {
1234: String msg = Translate
1235: .get(
1236: "scheduler.rollbacksavepoint.timeout.writeSync",
1237: pendingWrites);
1238: logger.warn(msg);
1239: throw new SQLException(msg);
1240: }
1241: } else
1242: writesSync.wait();
1243: } catch (InterruptedException e) {
1244: String msg = Translate
1245: .get(
1246: "scheduler.rollbacksavepoint.timeout.writeSync",
1247: pendingWrites)
1248: + " (" + e + ")";
1249: logger.error(msg);
1250: throw new SQLException(msg);
1251: }
1252: }
1253: pendingWrites++;
1254:
1255: if (logger.isDebugEnabled())
1256: logger.debug("Rollback " + savepointName
1257: + " scheduled - current pending writes: "
1258: + pendingWrites);
1259: }
1260:
1261: this .rollbackTransaction(tm.getTransactionId(), savepointName);
1262: }
1263:
1264: /**
1265: * Rollback a transaction given its id.
1266: *
1267: * @param transactionId the transaction id
1268: */
1269: protected abstract void rollbackTransaction(long transactionId);
1270:
1271: /**
1272: * Rollback a transaction given its id to a savepoint given its name.
1273: *
1274: * @param transactionId the transaction id
1275: * @param savepointName the name of the savepoint
1276: */
1277: protected abstract void rollbackTransaction(long transactionId,
1278: String savepointName);
1279:
1280: /**
1281: * Notify the completion of a rollback command.
1282: *
1283: * @param tm The transaction marker metadata
1284: * @param isSuccess true if commit was successful, false otherwise
1285: */
1286: public final void rollbackCompleted(TransactionMetaData tm,
1287: boolean isSuccess) {
1288: boolean transactionIsActive = false;
1289: synchronized (writesSync) {
1290: if (isSuccess) {
1291: transactionIsActive = activeTransactions.remove(tm);
1292: }
1293: }
1294: if (transactionIsActive) {
1295: // Take care of suspended transactions
1296: synchronized (transactionsSync) {
1297: pendingTransactions--;
1298: if (pendingTransactions < 0) {
1299: logger
1300: .error("Negative pending transactions detected on rollback completion for transaction "
1301: + tm.getTransactionId());
1302: pendingTransactions = 0;
1303: }
1304:
1305: if (logger.isDebugEnabled())
1306: logger
1307: .debug("Rollback completed, remaining pending transactions: "
1308: + pendingTransactions);
1309:
1310: checkPendingTransactions();
1311: }
1312: } else if ((isSuccess) && (logger.isDebugEnabled()))
1313: logger.debug("Transaction " + tm.getTransactionId()
1314: + " has already completed.");
1315:
1316: // Take care of suspended write
1317: synchronized (writesSync) {
1318: pendingWrites--;
1319:
1320: if (pendingWrites < 0) {
1321: logger
1322: .error("Negative pending writes detected on rollback completion for transaction "
1323: + tm.getTransactionId());
1324: pendingWrites = 0;
1325: }
1326:
1327: if (logger.isDebugEnabled())
1328: logger
1329: .debug("Rollback completed, remaining pending writes: "
1330: + pendingWrites);
1331:
1332: checkPendingWrites();
1333: }
1334: }
1335:
1336: /**
1337: * Set an unnamed savepoint.
1338: * <p>
1339: * Calls the implementation specific setSavepointTransaction()
1340: *
1341: * @param tm transaction marker metadata
1342: * @return savepoint Id
1343: * @throws SQLException if an error occurs
1344: */
1345: public final int setSavepoint(TransactionMetaData tm)
1346: throws SQLException {
1347: // Check if writes are suspended
1348: synchronized (writesSync) {
1349: if (suspendWrites > 0) {
1350: try {
1351: // Wait on writesSync
1352: long timeout = tm.getTimeout();
1353: if (timeout > 0) {
1354: long start = System.currentTimeMillis();
1355: writesSync.wait(timeout);
1356: long end = System.currentTimeMillis();
1357: long remaining = timeout - (end - start);
1358: if (remaining > 0)
1359: tm.setTimeout(remaining);
1360: else {
1361: String msg = Translate
1362: .get(
1363: "scheduler.setsavepoint.timeout.writeSync",
1364: pendingWrites);
1365: logger.warn(msg);
1366: throw new SQLException(msg);
1367: }
1368: } else
1369: writesSync.wait();
1370: } catch (InterruptedException e) {
1371: String msg = Translate.get(
1372: "scheduler.setsavepoint.timeout.writeSync",
1373: pendingWrites)
1374: + " (" + e + ")";
1375: logger.error(msg);
1376: throw new SQLException(msg);
1377: }
1378: }
1379: pendingWrites++;
1380:
1381: if (logger.isDebugEnabled())
1382: logger
1383: .debug("Set savepoint scheduled - current pending writes: "
1384: + pendingWrites);
1385: }
1386:
1387: int savepointId = this .incrementSavepointId();
1388: this .setSavepointTransaction(tm.getTransactionId(), String
1389: .valueOf(savepointId));
1390: return savepointId;
1391: }
1392:
1393: /**
1394: * Set a named savepoint.
1395: * <p>
1396: * Calls the implementation specific setSavepointTransaction()
1397: *
1398: * @param tm transaction marker metadata
1399: * @param name name of the savepoint
1400: * @param dmsg distributed message which triggered this operation
1401: * @throws SQLException if an error occurs
1402: */
1403: public final void setSavepoint(TransactionMetaData tm, String name,
1404: DistributedSetSavepoint dmsg) throws SQLException {
1405: // Check if writes are suspended
1406: synchronized (writesSync) {
1407: if (suspendWrites > 0) {
1408: addSuspendedRequest(dmsg);
1409: try {
1410: // Wait on writesSync
1411: long timeout = tm.getTimeout();
1412: if (timeout > 0) {
1413: long start = System.currentTimeMillis();
1414: writesSync.wait(timeout);
1415: long end = System.currentTimeMillis();
1416: long remaining = timeout - (end - start);
1417: if (remaining > 0)
1418: tm.setTimeout(remaining);
1419: else {
1420: String msg = Translate
1421: .get(
1422: "scheduler.setsavepoint.timeout.writeSync",
1423: pendingWrites);
1424: logger.warn(msg);
1425: throw new SQLException(msg);
1426: }
1427: } else
1428: writesSync.wait();
1429: } catch (InterruptedException e) {
1430: String msg = Translate.get(
1431: "scheduler.setsavepoint.timeout.writeSync",
1432: pendingWrites)
1433: + " (" + e + ")";
1434: logger.error(msg);
1435: throw new SQLException(msg);
1436: }
1437: }
1438: pendingWrites++;
1439:
1440: if (logger.isDebugEnabled())
1441: logger.debug("Set savepoint " + name
1442: + " scheduled - current pending writes: "
1443: + pendingWrites);
1444: }
1445:
1446: this .setSavepointTransaction(tm.getTransactionId(), name);
1447: }
1448:
1449: /**
1450: * Set a savepoint given its name to a transaction given its id.
1451: *
1452: * @param transactionId the transaction id
1453: * @param name the name of the savepoint
1454: */
1455: protected abstract void setSavepointTransaction(long transactionId,
1456: String name);
1457:
1458: /**
1459: * Release a savepoint.
1460: * <p>
1461: * Calls the implementation specific releaseSavepointTransaction()
1462: *
1463: * @param tm transaction marker metadata
1464: * @param name name of the savepoint
1465: * @param dmsg distributed message which triggered this operation
1466: * @throws SQLException if an error occurs
1467: */
1468: public final void releaseSavepoint(TransactionMetaData tm,
1469: String name, DistributedReleaseSavepoint dmsg)
1470: throws SQLException {
1471: // Check if writes are suspended
1472: synchronized (writesSync) {
1473: if (suspendWrites > 0) {
1474: addSuspendedRequest(dmsg);
1475: try {
1476: // Wait on writesSync
1477: long timeout = tm.getTimeout();
1478: if (timeout > 0) {
1479: long start = System.currentTimeMillis();
1480: writesSync.wait(timeout);
1481: long end = System.currentTimeMillis();
1482: long remaining = timeout - (end - start);
1483: if (remaining > 0)
1484: tm.setTimeout(remaining);
1485: else {
1486: String msg = Translate
1487: .get(
1488: "scheduler.releasesavepoint.timeout.writeSync",
1489: pendingWrites);
1490: logger.warn(msg);
1491: throw new SQLException(msg);
1492: }
1493: } else
1494: writesSync.wait();
1495: } catch (InterruptedException e) {
1496: String msg = Translate
1497: .get(
1498: "scheduler.releasesavepoint.timeout.writeSync",
1499: pendingWrites)
1500: + " (" + e + ")";
1501: logger.error(msg);
1502: throw new SQLException(msg);
1503: }
1504: }
1505: pendingWrites++;
1506:
1507: if (logger.isDebugEnabled())
1508: logger.debug("Release savepoint " + name
1509: + " scheduled - current pending writes: "
1510: + pendingWrites);
1511: }
1512:
1513: this .releaseSavepointTransaction(tm.getTransactionId(), name);
1514: }
1515:
1516: /**
1517: * Release a savepoint given its name from a transaction given its id.
1518: *
1519: * @param transactionId the transaction id
1520: * @param name the name of the savepoint
1521: */
1522: protected abstract void releaseSavepointTransaction(
1523: long transactionId, String name);
1524:
1525: /**
1526: * Notify the conpletion of a savepoint action.
1527: *
1528: * @param transactionId the transaction identifier
1529: */
1530: public final void savepointCompleted(long transactionId) {
1531: synchronized (writesSync) {
1532: pendingWrites--;
1533:
1534: if (pendingWrites < 0) {
1535: logger
1536: .error("Negative pending writes detected on savepoint completion for transaction"
1537: + transactionId);
1538: pendingWrites = 0;
1539: }
1540:
1541: if (logger.isDebugEnabled())
1542: logger
1543: .debug("Savepoint completed, remaining pending writes: "
1544: + pendingWrites);
1545:
1546: checkPendingWrites();
1547: }
1548: }
1549:
1550: //
1551: // 6. Checkpoint management
1552: //
1553:
1554: /**
1555: * Resume new transactions that were suspended by
1556: * suspendNewTransactionsForCheckpoint().
1557: *
1558: * @see #suspendNewTransactions()
1559: */
1560: public final void resumeNewTransactions() {
1561: if (logger.isDebugEnabled())
1562: logger.debug("Resuming new transactions");
1563:
1564: synchronized (transactionsSync) {
1565: suspendTransactions--;
1566: if (suspendTransactions < 0) {
1567: suspendTransactions = 0;
1568: logger
1569: .error("Unexpected negative suspendedTransactions in AbstractScheduler.resumeNewTransactions()");
1570: }
1571: if (suspendTransactions == 0) {
1572: // Wake up all pending begin statements
1573: transactionsSync.notifyAll();
1574: }
1575: }
1576: }
1577:
1578: /**
1579: * Suspend all calls to begin() until until resumeWrites() is called. This
1580: * method does not block and returns immediately. To synchronize on suspended
1581: * writes completion, you must call waitForSuspendedWritesToComplete().
1582: * <p>
1583: * New transactions remain suspended until resumeNewTransactions() is called.
1584: *
1585: * @see #resumeNewTransactions()
1586: * @see #waitForSuspendedTransactionsToComplete()
1587: */
1588: public final void suspendNewTransactions() {
1589: if (logger.isDebugEnabled())
1590: logger.debug("Suspending new transactions");
1591:
1592: synchronized (transactionsSync) {
1593: suspendTransactions++;
1594: }
1595: }
1596:
1597: /**
1598: * Suspend all calls to begin() until until resumeWrites() and
1599: * resumeNewTransactions are called. This method does not block and returns
1600: * immediately. To synchronize on suspended writes completion, you must call
1601: * waitForSuspendedWritesToComplete(). Suspending writes and transactions is
1602: * done atomically in order to close a window in begin().
1603: * <p>
1604: * New transactions remain suspended until resumeNewTransactions() and
1605: * resumeWrites are called.
1606: *
1607: * @see #resumeNewTransactions()
1608: * @see #waitForSuspendedTransactionsToComplete()
1609: */
1610: public void suspendNewTransactionsAndWrites() {
1611: if (logger.isDebugEnabled())
1612: logger.debug("Suspending new transactions and writes");
1613:
1614: synchronized (writesSync) {
1615: synchronized (transactionsSync) {
1616: suspendTransactions++;
1617: suspendWrites++;
1618: }
1619: }
1620: }
1621:
1622: /**
1623: * Wait for suspended transactions to complete. Returns as soon as number of
1624: * pending transactions has reached 0.
1625: *
1626: * @throws SQLException if an error occured during wait
1627: */
1628: public void waitForSuspendedTransactionsToComplete()
1629: throws SQLException {
1630: synchronized (transactionsSync) {
1631: if (pendingTransactions == 0) {
1632: if (logger.isDebugEnabled())
1633: logger.debug("All transactions suspended");
1634: return;
1635: }
1636: }
1637:
1638: // Wait for pending transactions to end
1639: boolean checkForTimeout = waitForSuspendedTransactionsTimeout > 0;
1640: long waitTime = INITIAL_WAIT_TIME;
1641: long totalWaitTime = 0;
1642: long start;
1643: long realWait;
1644: while (true) {
1645: synchronized (endOfCurrentTransactions) {
1646: // Here we have a potential synchronization problem since the last
1647: // transaction completion could have happened before we entered this
1648: // synchronized block. Therefore we recheck if there is effectively
1649: // still pending transactions. If this is not the case, we don't have
1650: // to sleep and we can immediately return.
1651: if (pendingTransactions == 0) {
1652: if (logger.isDebugEnabled())
1653: logger.debug("All new transactions suspended");
1654: return;
1655: }
1656:
1657: if (logger.isDebugEnabled())
1658: logger.debug("Waiting for " + pendingTransactions
1659: + " transactions to complete.");
1660:
1661: try {
1662: start = System.currentTimeMillis();
1663: endOfCurrentTransactions.wait(waitTime);
1664: realWait = System.currentTimeMillis() - start;
1665: totalWaitTime += realWait;
1666: } catch (InterruptedException e) {
1667: String msg = Translate.get(
1668: "scheduler.suspend.transaction.failed", e);
1669: logger.error(msg);
1670: throw new SQLException(msg);
1671: }
1672: }
1673: synchronized (transactionsSync) {
1674: if (pendingTransactions == 0) {
1675: checkForTimeout = false;
1676: break;
1677: }
1678: if (logger.isWarnEnabled()
1679: && (activeTransactions.size() > 0)) {
1680: StringBuffer transactions = new StringBuffer("[");
1681: for (Iterator iter = activeTransactions.iterator(); iter
1682: .hasNext();)
1683: transactions
1684: .append((transactions.length() > 1 ? ", "
1685: : "")
1686: + ((TransactionMetaData) iter
1687: .next())
1688: .getTransactionId());
1689: transactions.append("]");
1690: logger.warn("Waited for "
1691: + Math.round((totalWaitTime / 1000.0))
1692: + " secs but " + activeTransactions.size()
1693: + " transactions still open: "
1694: + transactions);
1695: if (checkForTimeout)
1696: logger
1697: .warn("Will wait for "
1698: + Math
1699: .max(
1700: 0,
1701: Math
1702: .round((waitForSuspendedTransactionsTimeout - totalWaitTime) / 1000.0))
1703: + " secs more and attempt to abort them");
1704: }
1705: if (checkForTimeout
1706: && totalWaitTime >= waitForSuspendedTransactionsTimeout)
1707: break;
1708: waitTime *= 2;
1709: if (checkForTimeout)
1710: waitTime = Math.min(waitTime,
1711: waitForSuspendedTransactionsTimeout
1712: - totalWaitTime);
1713: }
1714: }
1715: if (checkForTimeout
1716: && totalWaitTime >= waitForSuspendedTransactionsTimeout) {
1717: if (logger.isWarnEnabled())
1718: logger
1719: .warn("Timeout reached ("
1720: + Math
1721: .round(waitForSuspendedTransactionsTimeout / 1000.0)
1722: + " secs), aborting remaining active transactions");
1723: abortRemainingActiveTransactions();
1724: }
1725: if (logger.isDebugEnabled())
1726: logger.debug("All new transactions suspended");
1727: }
1728:
1729: /**
1730: * Resume the execution of the <em>new write queries</em> that were
1731: * suspended by <code>suspendNewWrites()</code>.
1732: *
1733: * @see #suspendNewWrites()
1734: */
1735: public void resumeWrites() {
1736: if (logger.isDebugEnabled())
1737: logger.debug("Resuming writes");
1738:
1739: synchronized (writesSync) {
1740: suspendWrites--;
1741: if (suspendWrites < 0) {
1742: suspendWrites = 0;
1743: logger
1744: .error("Unexpected negative suspendedWrites in AbstractScheduler.resumeWrites()");
1745: }
1746: if (suspendWrites == 0) {
1747: // Wake up all waiting writes
1748: writesSync.notifyAll();
1749: }
1750: }
1751: }
1752:
1753: /**
1754: * Checks if the write queries are suspended and there is no remaining pending
1755: * writes. In that case, notify <code>endOcCurrentWrites</code>
1756: */
1757: private void checkPendingWrites() {
1758: synchronized (writesSync) {
1759: // If this is the last write to complete and writes are
1760: // suspended we have to notify suspendedWrites()
1761: if ((suspendWrites > 0) && (pendingWrites == 0)) {
1762: synchronized (endOfCurrentWrites) {
1763: endOfCurrentWrites.notifyAll();
1764: }
1765: }
1766: }
1767: }
1768:
1769: /**
1770: * Checks if the transactions are suspended and that there is no remaining
1771: * pending transactions. In that case, notify
1772: * <code>endOfCurrentTransactions</code>
1773: *
1774: * @see #suspendNewTransactions()
1775: */
1776: private void checkPendingTransactions() {
1777: synchronized (transactionsSync) {
1778: // If it is the last pending transaction to complete and we
1779: // are waiting for pending transactions to complete, then wake
1780: // up suspendNewTransactionsForCheckpoint()
1781: if ((suspendTransactions > 0) && (pendingTransactions == 0)) {
1782: synchronized (endOfCurrentTransactions) {
1783: endOfCurrentTransactions.notifyAll();
1784: }
1785: }
1786: }
1787: }
1788:
1789: /**
1790: * Resume suspended writes, transactions and persistent connections (in this
1791: * order).
1792: */
1793: public void resumeWritesTransactionsAndPersistentConnections() {
1794: clearSuspendedRequests();
1795: resumeWrites();
1796: resumeNewTransactions();
1797: resumeNewPersistentConnections();
1798: }
1799:
1800: /**
1801: * Suspend all <em>new write queries</em> until resumeWrites() is called.
1802: * This method does not block and returns immediately. To synchronize on
1803: * suspended writes completion, you must call
1804: * waitForSuspendedWritesToComplete().
1805: *
1806: * @see #resumeWrites()
1807: * @see #waitForSuspendedWritesToComplete()
1808: */
1809: public void suspendNewWrites() {
1810: if (logger.isDebugEnabled())
1811: logger.debug("Suspending new writes");
1812:
1813: synchronized (writesSync) {
1814: suspendWrites++;
1815: }
1816: }
1817:
1818: /**
1819: * @return Returns the suspendedWrites.
1820: */
1821: public boolean isSuspendedWrites() {
1822: return suspendWrites > 0;
1823: }
1824:
1825: /**
1826: * Wait for suspended writes to complete. Returns as soon as number of pending
1827: * writes has reached 0.
1828: *
1829: * @throws SQLException if an error occured during wait
1830: */
1831: public void waitForSuspendedWritesToComplete() throws SQLException {
1832: synchronized (writesSync) {
1833: if (pendingWrites == 0) {
1834: if (logger.isDebugEnabled())
1835: logger.debug("All writes suspended");
1836: return;
1837: }
1838: }
1839:
1840: long waitTime = INITIAL_WAIT_TIME;
1841: while (true) {
1842: synchronized (endOfCurrentWrites) {
1843: // Here we have a potential synchronization problem since the last
1844: // write completion could have happened before we entered this
1845: // synchronized block. Therefore we recheck if there is effectively
1846: // still pending writes. If this is not the case, we don't have
1847: // to sleep and we can immediately return.
1848: if (pendingWrites == 0) {
1849: if (logger.isDebugEnabled())
1850: logger.debug("All writes suspended");
1851: return;
1852: }
1853:
1854: if (logger.isDebugEnabled())
1855: logger.debug("Wait for " + pendingWrites
1856: + " writes to complete.");
1857:
1858: // Wait for pending writes to end
1859: try {
1860: endOfCurrentWrites.wait(waitTime);
1861: } catch (InterruptedException e) {
1862: String msg = Translate.get(
1863: "scheduler.suspend.writes.failed", e);
1864: logger.error(msg);
1865: throw new SQLException(msg);
1866: }
1867: }
1868: synchronized (writesSync) {
1869: if (pendingWrites == 0)
1870: break;
1871: else {
1872: logger.warn("Waiting for " + pendingWrites
1873: + " pending writes");
1874: waitTime *= 2;
1875: }
1876: }
1877: }
1878:
1879: if (logger.isDebugEnabled())
1880: logger.debug("All writes suspended");
1881: }
1882:
1883: /**
1884: * Resumes openinh and closing of persistent connections.
1885: */
1886: public void resumeOpenClosePersistentConnection() {
1887: synchronized (suspendOpenClosePersistentConnectionSync) {
1888: suspendOpenClosePersistentConnections--;
1889: if (suspendOpenClosePersistentConnections == 0)
1890: suspendOpenClosePersistentConnectionSync.notifyAll();
1891: }
1892: }
1893:
1894: /**
1895: * Resume new persistent connections creations that were suspended by
1896: * suspendNewPersistentConnections().
1897: *
1898: * @see #suspendNewPersistentConnections()
1899: */
1900: public final void resumeNewPersistentConnections() {
1901: if (logger.isDebugEnabled())
1902: logger.debug("Resuming new persistent connections");
1903:
1904: synchronized (persistentConnectionsSync) {
1905: suspendNewPersistentConnections--;
1906: if (suspendNewPersistentConnections < 0) {
1907: suspendNewPersistentConnections = 0;
1908: logger
1909: .error("Unexpected negative suspendedPersistentConnections in AbstractScheduler.resumeNewPersistentConnections()");
1910: }
1911: if (suspendNewPersistentConnections == 0) {
1912: // Wake up all pending persistent connections creation
1913: persistentConnectionsSync.notifyAll();
1914: }
1915: }
1916: }
1917:
1918: /**
1919: * Suspends open and closing of persistent connections.
1920: *
1921: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#closePersistentConnection(String,
1922: * long)
1923: */
1924: public void suspendOpenClosePersistentConnection() {
1925: synchronized (suspendOpenClosePersistentConnectionSync) {
1926: suspendOpenClosePersistentConnections++;
1927: }
1928: }
1929:
1930: /**
1931: * Suspend all new persistent connections creation. This method does not block
1932: * and returns immediately. New connections remain suspended until
1933: * resumeNewPersistentConnections() is called.
1934: *
1935: * @see #resumeNewPersistentConnections()
1936: * @see #waitForSuspendedPersistentConnectionsToComplete()
1937: */
1938: public void suspendNewPersistentConnections() {
1939: if (logger.isDebugEnabled())
1940: logger.debug("Suspending new persistent connections");
1941:
1942: synchronized (persistentConnectionsSync) {
1943: suspendNewPersistentConnections++;
1944: }
1945: }
1946:
1947: /**
1948: * Wait for opened persistent connections to complete. Returns as soon as
1949: * number of pending persistent connections has reached 0.
1950: *
1951: * @throws SQLException if an error occured during wait
1952: */
1953: public void waitForPersistentConnectionsToComplete()
1954: throws SQLException {
1955: synchronized (persistentConnectionsSync) {
1956: if (activePersistentConnections.isEmpty()) {
1957: if (logger.isDebugEnabled())
1958: logger.debug("All persistent connections closed");
1959: return;
1960: }
1961: }
1962:
1963: // Wait for persistent connections to end
1964: boolean checkForTimeout = waitForPersistentConnectionsTimeout > 0;
1965: long totalWaitTime = 0;
1966: synchronized (endOfCurrentPersistentConnections) {
1967: if (activePersistentConnections.isEmpty()) {
1968: if (logger.isDebugEnabled())
1969: logger.debug("All persistent connections closed");
1970: return;
1971: }
1972:
1973: if (logger.isDebugEnabled())
1974: logger.debug("Waiting for "
1975: + activePersistentConnections.size()
1976: + " persistent connections to be closed.");
1977:
1978: long waitTime = INITIAL_WAIT_TIME;
1979: long start;
1980: long realWait;
1981: while (!activePersistentConnections.isEmpty())
1982: try {
1983: start = System.currentTimeMillis();
1984: endOfCurrentPersistentConnections.wait(waitTime);
1985: realWait = System.currentTimeMillis() - start;
1986: totalWaitTime += realWait;
1987: if (logger.isWarnEnabled()
1988: && (activePersistentConnections.size() > 0)) {
1989: logger
1990: .warn("Waited for "
1991: + Math
1992: .round((totalWaitTime / 1000.0))
1993: + " secs but "
1994: + activePersistentConnections
1995: .size()
1996: + " persistent connections still open: "
1997: + activePersistentConnections
1998: .keySet());
1999: if (checkForTimeout)
2000: logger
2001: .warn("Will wait for "
2002: + Math
2003: .max(
2004: 0,
2005: Math
2006: .round((waitForPersistentConnectionsTimeout - totalWaitTime) / 1000.0))
2007: + " secs more and attempt to close them");
2008: }
2009: if (checkForTimeout
2010: && totalWaitTime >= waitForPersistentConnectionsTimeout)
2011: break;
2012: waitTime *= 2;
2013: if (checkForTimeout)
2014: waitTime = Math.min(waitTime,
2015: waitForPersistentConnectionsTimeout
2016: - totalWaitTime);
2017: } catch (InterruptedException e) {
2018: String msg = Translate.get(
2019: "scheduler.suspend.transaction.failed", e);
2020: logger.error(msg);
2021: throw new SQLException(msg);
2022: }
2023: }
2024: if (checkForTimeout
2025: && totalWaitTime >= waitForPersistentConnectionsTimeout) {
2026: if (logger.isWarnEnabled())
2027: logger
2028: .warn("Timeout reached ("
2029: + Math
2030: .round(waitForPersistentConnectionsTimeout / 1000.0)
2031: + " secs), closing remaining active persistent connections");
2032: closeRemainingPersistentConnections();
2033: }
2034: if (logger.isDebugEnabled())
2035: logger.debug("All persistent connections closed");
2036: }
2037:
2038: /**
2039: * Blocks until all pending open/close persistent connections operations are
2040: * completed.
2041: */
2042: public void waitForPendingOpenClosePersistentConnection() {
2043: synchronized (suspendOpenClosePersistentConnectionSync) {
2044: while (pendingOpenClosePersistentConnections > 0) {
2045: try {
2046: suspendOpenClosePersistentConnectionSync.wait();
2047: } catch (InterruptedException ignore) {
2048: }
2049: }
2050: }
2051: }
2052:
2053: /**
2054: * Adds an object to the suspended requests list.
2055: *
2056: * @param obj suspended request.
2057: */
2058: private void addSuspendedRequest(Object obj) {
2059: synchronized (suspendedRequests) {
2060: suspendedRequests.add(obj);
2061: }
2062: if (vdb.isDistributed()) { // Distributed virtual database only
2063: List totalOrderQueue = vdb.getTotalOrderQueue();
2064: synchronized (totalOrderQueue) {
2065: totalOrderQueue.notifyAll();
2066: }
2067: }
2068: }
2069:
2070: /**
2071: * Checks if an object is in the suspended requests list.
2072: *
2073: * @param obj request to be checked
2074: * @return true if the request is suspended, false otherwise
2075: */
2076: public boolean isSuspendedRequest(Object obj) {
2077: synchronized (suspendedRequests) {
2078: return suspendedRequests.contains(obj);
2079: }
2080: }
2081:
2082: /**
2083: * Removes all objects from the uspended requests list.
2084: */
2085: private void clearSuspendedRequests() {
2086: synchronized (suspendedRequests) {
2087: suspendedRequests.clear();
2088: }
2089: if (vdb.isDistributed()) { // Distributed virtual database only
2090: List totalOrderQueue = vdb.getTotalOrderQueue();
2091: synchronized (totalOrderQueue) {
2092: totalOrderQueue.notifyAll();
2093: }
2094: }
2095: }
2096:
2097: //
2098: // 7. Debug/Monitoring
2099: //
2100:
2101: protected abstract String getXmlImpl();
2102:
2103: /**
2104: * Get information about the Request Scheduler in xml format
2105: *
2106: * @return <code>String</code> containing information in xml
2107: */
2108: public String getXml() {
2109: StringBuffer info = new StringBuffer();
2110: info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">");
2111: info.append(this .getXmlImpl());
2112: info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">");
2113: return info.toString();
2114: }
2115:
2116: /**
2117: * Returns live information on the scheduler
2118: *
2119: * @return array of data
2120: */
2121: public String[] getSchedulerData() {
2122: String[] data = new String[7];
2123: data[0] = String.valueOf(numberRead);
2124: data[1] = String.valueOf(numberWrite);
2125: data[2] = String.valueOf(pendingTransactions);
2126: data[3] = String.valueOf(pendingWrites);
2127: data[4] = String.valueOf(numberRead + numberWrite);
2128: data[5] = String.valueOf(suspendTransactions);
2129: data[6] = String.valueOf(suspendWrites);
2130: return data;
2131: }
2132:
2133: /**
2134: * @return Returns the numberRead.
2135: */
2136: public int getNumberRead() {
2137: return numberRead;
2138: }
2139:
2140: /**
2141: * @return Returns the numberWrite.
2142: */
2143: public int getNumberWrite() {
2144: return numberWrite;
2145: }
2146:
2147: /**
2148: * @return Returns the pendingTransactions.
2149: */
2150: public int getPendingTransactions() {
2151: return pendingTransactions;
2152: }
2153:
2154: private void abortRemainingActiveTransactions() throws SQLException {
2155: List transactionsToAbort = new ArrayList();
2156: synchronized (writesSync) {
2157: transactionsToAbort.addAll(activeTransactions);
2158: }
2159: for (Iterator iter = transactionsToAbort.iterator(); iter
2160: .hasNext();) {
2161: long transactionId = ((TransactionMetaData) iter.next())
2162: .getTransactionId();
2163: if (logger.isWarnEnabled())
2164: logger.warn("Aborting transaction " + transactionId);
2165: vdb.abort(transactionId, true, true);
2166: }
2167: }
2168:
2169: private void closeRemainingPersistentConnections() {
2170: Map persistentConnectionsToClose = new HashMap();
2171: synchronized (endOfCurrentPersistentConnections) {
2172: persistentConnectionsToClose
2173: .putAll(activePersistentConnections);
2174: }
2175: for (Iterator iter = persistentConnectionsToClose.keySet()
2176: .iterator(); iter.hasNext();) {
2177: Long persistentConnectionId = (Long) iter.next();
2178: if (logger.isWarnEnabled())
2179: logger.warn("Closing persistent connection "
2180: + persistentConnectionId);
2181: vdb.closePersistentConnection(
2182: (String) persistentConnectionsToClose
2183: .get(persistentConnectionId),
2184: persistentConnectionId.longValue());
2185: }
2186: }
2187:
2188: }
|