0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Olivier Fambon, Jean-Bernard van Zuylen, Damian Arregui,
0023: * Peter Royal.
0024: */package org.continuent.sequoia.controller.requestmanager.distributed;
0025:
0026: import java.io.Serializable;
0027: import java.sql.SQLException;
0028: import java.util.ArrayList;
0029: import java.util.HashMap;
0030: import java.util.Iterator;
0031: import java.util.LinkedList;
0032: import java.util.List;
0033:
0034: import javax.management.NotCompliantMBeanException;
0035:
0036: import org.continuent.hedera.adapters.MulticastRequestAdapter;
0037: import org.continuent.hedera.adapters.MulticastResponse;
0038: import org.continuent.hedera.common.Member;
0039: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0040: import org.continuent.sequoia.common.exceptions.NoResultAvailableException;
0041: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0042: import org.continuent.sequoia.common.i18n.Translate;
0043: import org.continuent.sequoia.common.jmx.management.BackendInfo;
0044: import org.continuent.sequoia.common.log.Trace;
0045: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0046: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0047: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0048: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0049: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0050: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0051: import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
0052: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0053: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0054: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0055: import org.continuent.sequoia.controller.requestmanager.RequestManager;
0056: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0057: import org.continuent.sequoia.controller.requests.AbstractRequest;
0058: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0059: import org.continuent.sequoia.controller.requests.SelectRequest;
0060: import org.continuent.sequoia.controller.requests.StoredProcedure;
0061: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0062: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0063: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
0064: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0065: import org.continuent.sequoia.controller.virtualdatabase.protocol.BlockActivity;
0066: import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint;
0067: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
0068: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0069: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0070: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0071: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0072: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0073: import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForPersistentConnection;
0074: import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForTransaction;
0075: import org.continuent.sequoia.controller.virtualdatabase.protocol.GetRequestResultFromFailoverCache;
0076: import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyCompletion;
0077: import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyDisableBackend;
0078: import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyEnableBackend;
0079: import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyInconsistency;
0080: import org.continuent.sequoia.controller.virtualdatabase.protocol.ResumeActivity;
0081: import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendActivity;
0082:
0083: /**
0084: * This class defines a Distributed Request Manager.
0085: * <p>
0086: * The DRM is composed of a Request Scheduler, an optional Query Cache, and a
0087: * Load Balancer and an optional Recovery Log. Unlike a non-distributed Request
0088: * Manager, this implementation is responsible for synchronizing the different
0089: * controllers components (schedulers, ...). Functions that are RAIDb level
0090: * dependent are implemented in sub-classes.
0091: *
0092: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0093: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0094: * </a>
0095: * @author <a href="mailto:Damian.Arregui@emicnetworks.com">Damian Arregui</a>
0096: * @version 1.0
0097: */
0098: public abstract class DistributedRequestManager extends RequestManager {
0099: protected DistributedVirtualDatabase dvdb;
0100: /**
0101: * List of queries that failed on all backends. Value contained in the map is
0102: * a boolean indicating whether the request has been scheduled or not before
0103: * the failure so that we know if the scheduler must be notified or not.
0104: */
0105: private HashMap failedOnAllBackends;
0106: /** Unique controller identifier */
0107: private long controllerId;
0108: /** List of transactions that have executed on multiple controllers */
0109: protected LinkedList distributedTransactions;
0110:
0111: /**
0112: * Constant to acknowledge the successful completion of a distributed query
0113: */
0114: public static final Integer SUCCESSFUL_COMPLETION = new Integer(-1);
0115:
0116: /**
0117: * Builds a new <code>DistributedRequestManager</code> instance without
0118: * cache.
0119: *
0120: * @param vdb the virtual database this request manager belongs to
0121: * @param scheduler the Request Scheduler to use
0122: * @param cache a Query Cache implementation
0123: * @param loadBalancer the Request Load Balancer to use
0124: * @param recoveryLog the Log Recovery to use
0125: * @param beginTimeout timeout in seconds for begin
0126: * @param commitTimeout timeout in seconds for commit
0127: * @param rollbackTimeout timeout in seconds for rollback
0128: * @throws SQLException if an error occurs
0129: * @throws NotCompliantMBeanException if this class is not a compliant JMX
0130: * MBean
0131: */
0132: public DistributedRequestManager(DistributedVirtualDatabase vdb,
0133: AbstractScheduler scheduler, AbstractResultCache cache,
0134: AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
0135: long beginTimeout, long commitTimeout, long rollbackTimeout)
0136: throws SQLException, NotCompliantMBeanException {
0137: super (vdb, scheduler, cache, loadBalancer, recoveryLog,
0138: beginTimeout, commitTimeout, rollbackTimeout);
0139: dvdb = vdb;
0140: failedOnAllBackends = new HashMap();
0141: distributedTransactions = new LinkedList();
0142: }
0143:
0144: //
0145: // Controller identifier related functions
0146: //
0147:
0148: /**
0149: * Effective controllerIds are on the upper 16 bits of a long (64 bits).
0150: * Distributed transaction ids (longs) are layed out as [ControllerId(16bits) |
0151: * LocalTransactionId(64bits)]. <br/>This constant used in
0152: * DistributedVirtualDatabase.
0153: */
0154: public static final long CONTROLLER_ID_BIT_MASK = 0xffff000000000000L;
0155: /**
0156: * TRANSACTION_ID_BIT_MASK is used to get the transaction id local to the
0157: * originating controller
0158: */
0159: public static final long TRANSACTION_ID_BIT_MASK = ~CONTROLLER_ID_BIT_MASK;
0160:
0161: /**
0162: * @see #CONTROLLER_ID_BIT_MASK
0163: */
0164: public static final int CONTROLLER_ID_SHIFT_BITS = 48;
0165:
0166: /**
0167: * @see #CONTROLLER_ID_BIT_MASK
0168: */
0169: public static final long CONTROLLER_ID_BITS = 0x000000000000ffffL;
0170:
0171: /**
0172: * Returns the unique controller identifier.
0173: *
0174: * @return Returns the controllerId.
0175: */
0176: public long getControllerId() {
0177: return controllerId;
0178: }
0179:
0180: /**
0181: * Sets the controller identifier value (this id must be unique). Parameter id
0182: * must hold on 16 bits (< 0xffff), otherwise an exception is thrown.
0183: * Effective this.controllerId is <strong>not </strong> set to passed
0184: * parameter id, but to id << ControllerIdShiftBits. The reason for all
0185: * this is that controllerIds are to be carried into ditributed transactions
0186: * ids, in the upper 16 bits.
0187: *
0188: * @param id The controllerId to set.
0189: */
0190: public void setControllerId(long id) {
0191: if ((id & ~CONTROLLER_ID_BITS) != 0) {
0192: String msg = "Out of range controller id (" + id + ")";
0193: logger.error(msg);
0194: throw new RuntimeException(msg);
0195: }
0196: this .controllerId = (id << CONTROLLER_ID_SHIFT_BITS)
0197: & CONTROLLER_ID_BIT_MASK;
0198: if (logger.isDebugEnabled())
0199: logger.debug("Setting controller identifier to " + id
0200: + " (shifted value is " + controllerId + ")");
0201:
0202: scheduler.setControllerId(controllerId);
0203: }
0204:
0205: /**
0206: * Make the given persistent connection id unique cluster-wide
0207: *
0208: * @param id original id
0209: * @return unique connection id
0210: */
0211: public long getNextConnectionId(long id) {
0212: // 2 first bytes are used for controller id
0213: // 6 right-most bytes are used for transaction id
0214: id = id & TRANSACTION_ID_BIT_MASK;
0215: id = id | controllerId;
0216: return id;
0217: }
0218:
0219: /**
0220: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#getNextRequestId()
0221: */
0222: public long getNextRequestId() {
0223: // We use the same bitmask as for transaction ids
0224:
0225: long id = super .getNextRequestId();
0226: // 2 first bytes are used for controller id
0227: // 6 right-most bytes are used for transaction id
0228: id = id & TRANSACTION_ID_BIT_MASK;
0229: id = id | controllerId;
0230: return id;
0231: }
0232:
0233: /**
0234: * Get the trace logger of this DistributedRequestManager
0235: *
0236: * @return a <code>Trace</code> object
0237: */
0238: public Trace getLogger() {
0239: return logger;
0240: }
0241:
0242: /**
0243: * Returns the vdb value.
0244: *
0245: * @return Returns the vdb.
0246: */
0247: public VirtualDatabase getVirtualDatabase() {
0248: return dvdb;
0249: }
0250:
0251: /**
0252: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setScheduler(org.continuent.sequoia.controller.scheduler.AbstractScheduler)
0253: */
0254: public void setScheduler(AbstractScheduler scheduler) {
0255: super .setScheduler(scheduler);
0256: // Note: don't try to use this.dvdb here: setScheduler is called by the
0257: // c'tor, and dvdb is not set at this time.
0258: if (vdb.getTotalOrderQueue() == null)
0259: throw new RuntimeException(
0260: "New scheduler does not support total ordering and is not compatible with distributed virtual databases.");
0261: }
0262:
0263: //
0264: // Database Backends management
0265: //
0266:
0267: /**
0268: * Enable a backend that has been previously added to this virtual database
0269: * and that is in the disabled state. We check we the other controllers if
0270: * this backend must be enabled in read-only or read-write. The current policy
0271: * is that the first one to enable this backend will have read-write access to
0272: * it and others will be in read-only.
0273: *
0274: * @param db The database backend to enable
0275: * @throws SQLException if an error occurs
0276: */
0277: public void enableBackend(DatabaseBackend db) throws SQLException {
0278: int size = dvdb.getAllMemberButUs().size();
0279: if (size > 0) {
0280: logger
0281: .debug(Translate
0282: .get("virtualdatabase.distributed.enable.backend.check"));
0283:
0284: try {
0285: // Notify other controllers that we enable this backend.
0286: // No answer is expected.
0287: dvdb.getMulticastRequestAdapter().multicastMessage(
0288: dvdb.getAllMemberButUs(),
0289: new NotifyEnableBackend(new BackendInfo(db)),
0290: MulticastRequestAdapter.WAIT_NONE,
0291: dvdb.getMessageTimeouts()
0292: .getEnableBackendTimeout());
0293: } catch (Exception e) {
0294: String msg = "Error while enabling backend "
0295: + db.getName();
0296: logger.error(msg, e);
0297: throw new SQLException(msg + "(" + e + ")");
0298: }
0299: }
0300:
0301: super .enableBackend(db);
0302: }
0303:
0304: /**
0305: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#disableBackend(org.continuent.sequoia.controller.backend.DatabaseBackend,
0306: * boolean)
0307: */
0308: public void disableBackend(DatabaseBackend db, boolean forceDisable)
0309: throws SQLException {
0310: int size = dvdb.getAllMemberButUs().size();
0311: if (size > 0) {
0312: logger.debug(Translate.get(
0313: "virtualdatabase.distributed.disable.backend", db
0314: .getName()));
0315:
0316: try {
0317: // Notify other controllers that we disable this backend.
0318: // No answer is expected.
0319: dvdb.getMulticastRequestAdapter().multicastMessage(
0320: dvdb.getAllMemberButUs(),
0321: new NotifyDisableBackend(new BackendInfo(db)),
0322: MulticastRequestAdapter.WAIT_NONE,
0323: dvdb.getMessageTimeouts()
0324: .getDisableBackendTimeout());
0325: } catch (Exception e) {
0326: String msg = "Error while disabling backend "
0327: + db.getName();
0328: logger.error(msg, e);
0329: throw new SQLException(msg + "(" + e + ")");
0330: }
0331: }
0332:
0333: super .disableBackend(db, forceDisable);
0334: }
0335:
0336: /**
0337: * {@inheritDoc}
0338: *
0339: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#disableBackendsWithCheckpoint(java.util.ArrayList,
0340: * java.lang.String)
0341: */
0342: public void disableBackendsWithCheckpoint(ArrayList backendInfos,
0343: String checkpointName) throws SQLException {
0344: // Perform the distributed call through the group-comm, in order to
0345: // atomically disable the backend and store a cluster-wide checkpoint.
0346: try {
0347: // Suspend transactions
0348: suspendActivity();
0349:
0350: dvdb.sendMessageToControllers(dvdb.getAllMembers(),
0351: new DisableBackendsAndSetCheckpoint(backendInfos,
0352: checkpointName), dvdb.getMessageTimeouts()
0353: .getDisableBackendTimeout());
0354: } catch (Exception e) {
0355: String msg = "Error while disabling backends "
0356: + backendInfos;
0357: logger.error(msg, e);
0358: throw new SQLException(msg + "(" + e + ")");
0359: } finally {
0360: resumeActivity();
0361: }
0362: }
0363:
0364: //
0365: // Failover management
0366: //
0367:
0368: private class FailureInformation {
0369: private boolean needSchedulerNotification;
0370: private long logId = -1;
0371: private boolean success;
0372: private boolean disableBackendOnSuccess;
0373: private int updateCount;
0374:
0375: /**
0376: * Creates a new <code>FailureInformation</code> object storing the
0377: * information about a failure that waits for a remote controller final
0378: * status.
0379: *
0380: * @param needSchedulerNotification true if the scheduler must be notified
0381: * @param logId the recovery log id of the query
0382: */
0383: public FailureInformation(boolean needSchedulerNotification,
0384: long logId) {
0385: this .needSchedulerNotification = needSchedulerNotification;
0386: this .logId = logId;
0387: }
0388:
0389: /**
0390: * Creates a new <code>FailureInformation</code> object storing the final
0391: * status from a remote controller. This version of the constructor is used
0392: * when the remote controller sends the final status before the local
0393: * controller has completed the request. This can happen in the case of a
0394: * timeout.
0395: *
0396: * @param success indicates the result of the operation on the remote
0397: * controller
0398: * @param disableBackendOnSuccess indicates whether or not the backend
0399: * should be disabled.
0400: * @param updateCount the update count for the request
0401: */
0402: public FailureInformation(boolean success,
0403: boolean disableBackendOnSuccess, int updateCount) {
0404: this .success = success;
0405: this .disableBackendOnSuccess = disableBackendOnSuccess;
0406: this .updateCount = updateCount;
0407: }
0408:
0409: /**
0410: * Returns the recovery log id value.
0411: *
0412: * @return the recovery log id.
0413: */
0414: public final long getLogId() {
0415: return logId;
0416: }
0417:
0418: /**
0419: * Sets the local logId for the request
0420: *
0421: * @param logId the log id to set
0422: */
0423: public void setLogId(long logId) {
0424: this .logId = logId;
0425: }
0426:
0427: /**
0428: * Returns true if scheduler notification is needed.
0429: *
0430: * @return true if scheduler notification is needed.
0431: */
0432: public final boolean needSchedulerNotification() {
0433: return needSchedulerNotification;
0434: }
0435:
0436: /**
0437: * sets the local scheduler notification indicator
0438: *
0439: * @param needSchedulerNotification true if scheduler notification is
0440: * needed.
0441: */
0442: public void setNeedSchedulerNotification(
0443: boolean needSchedulerNotification) {
0444: this .needSchedulerNotification = needSchedulerNotification;
0445: }
0446:
0447: /**
0448: * Indicates whether or not the backend should be disabled.
0449: *
0450: * @return true if backend must be disabled
0451: */
0452: public boolean isDisableBackendOnSuccess() {
0453: return disableBackendOnSuccess;
0454: }
0455:
0456: /**
0457: * Indicates whether or not the query was successful on the remote
0458: * controller.
0459: *
0460: * @return true if the query was successful on one of the remote
0461: * controllers.
0462: */
0463: public boolean isSuccess() {
0464: return success;
0465: }
0466:
0467: /**
0468: * Returns the update count (only meaningful if this was a request returning
0469: * an update count)
0470: *
0471: * @return update count
0472: */
0473: public int getUpdateCount() {
0474: return updateCount;
0475: }
0476:
0477: }
0478:
0479: private void logRequestCompletionAndNotifyScheduler(
0480: AbstractRequest request, boolean success,
0481: FailureInformation failureInfo, int updateCount) {
0482: // Update recovery log with completion information
0483: if (recoveryLog != null) {
0484: boolean mustLog = !request.isReadOnly();
0485: if (request instanceof StoredProcedure) {
0486: DatabaseProcedureSemantic semantic = ((StoredProcedure) request)
0487: .getSemantic();
0488: mustLog = (semantic == null) || semantic.isWrite();
0489: }
0490: if (mustLog && failureInfo.getLogId() != 0)
0491: recoveryLog.logRequestCompletion(
0492: failureInfo.getLogId(), success, request
0493: .getExecTimeInMs(), updateCount);
0494: }
0495:
0496: if (failureInfo.needSchedulerNotification()) {
0497: try {
0498: // Notify scheduler now, the notification was postponed when
0499: // addFailedOnAllBackends was called.
0500: if (request instanceof StoredProcedure)
0501: scheduler
0502: .storedProcedureCompleted((StoredProcedure) request);
0503: else if (!request.isAutoCommit()
0504: && (request instanceof UnknownWriteRequest)) {
0505: String sql = request.getSqlOrTemplate();
0506: TransactionMetaData tm = new TransactionMetaData(
0507: request.getTransactionId(), 0, request
0508: .getLogin(), request
0509: .isPersistentConnection(), request
0510: .getPersistentConnectionId());
0511: if ("commit".equals(sql))
0512: scheduler.commitCompleted(tm, success);
0513: else if ("rollback".equals(sql)
0514: || "abort".equals(sql))
0515: scheduler.rollbackCompleted(tm, success);
0516: else if (sql.startsWith("rollback")) // rollback to savepoint
0517: scheduler.savepointCompleted(tm
0518: .getTransactionId());
0519: else if (sql.startsWith("release "))
0520: scheduler.savepointCompleted(tm
0521: .getTransactionId());
0522: else if (sql.startsWith("savepoint "))
0523: scheduler.savepointCompleted(tm
0524: .getTransactionId());
0525: else
0526: // Real UnknownWriteRequest
0527: scheduler
0528: .writeCompleted((AbstractWriteRequest) request);
0529: } else
0530: // Just an AbstractWriteRequest
0531: scheduler
0532: .writeCompleted((AbstractWriteRequest) request);
0533: } catch (SQLException e) {
0534: logger.warn("Failed to notify scheduler for request "
0535: + request, e);
0536: }
0537: }
0538: }
0539:
0540: /**
0541: * Add a request that failed on all backends.
0542: *
0543: * @param request the request that failed
0544: * @param needSchedulerNotification true if the request has been scheduled but
0545: * the scheduler has not been notified yet of the request completion
0546: * @see #completeFailedOnAllBackends(AbstractRequest, boolean, boolean)
0547: */
0548: public void addFailedOnAllBackends(AbstractRequest request,
0549: boolean needSchedulerNotification) {
0550: synchronized (failedOnAllBackends) {
0551: /*
0552: * Failure information may already exist if the request was timed out at
0553: * the originating controller. In which case, we have all the information
0554: * required to complete the request.
0555: */
0556: FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
0557: .get(request);
0558: if (failureInfo == null)
0559: failedOnAllBackends.put(request,
0560: new FailureInformation(
0561: needSchedulerNotification, request
0562: .getLogId()));
0563: else {
0564: failureInfo.setLogId(request.getLogId());
0565: failureInfo
0566: .setNeedSchedulerNotification(needSchedulerNotification);
0567: completeFailedOnAllBackends(request, failureInfo
0568: .isSuccess(), failureInfo
0569: .isDisableBackendOnSuccess(),
0570: failureInfo.updateCount);
0571: }
0572: }
0573: }
0574:
0575: /**
0576: * Cleanup all queries registered as failed issued from the given controller.
0577: * This is used when a controller has failed and no status information will be
0578: * returned for failed queries. Queries are systematically tagged as failed.
0579: * <p>
0580: * FIXME: This is only correct with 2 controllers, in a 3+ controller
0581: * scenario, we need to check from other controllers if someone succeeded.
0582: *
0583: * @param failedControllerId id of the controller that has failed
0584: */
0585: public void cleanupAllFailedQueriesFromController(
0586: long failedControllerId) {
0587: synchronized (failedOnAllBackends) {
0588: for (Iterator iter = failedOnAllBackends.keySet()
0589: .iterator(); iter.hasNext();) {
0590: AbstractRequest request = (AbstractRequest) iter.next();
0591: if (((request.getId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)
0592: || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)
0593: || (request.isPersistentConnection() && (request
0594: .getPersistentConnectionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)) { // Need to remove that entry
0595: FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
0596: .get(request);
0597: // failedOnAllBackends can contain completion status information for
0598: // requests that failed before we started processing processing
0599: // requests. These entries do not have a logId and should be ignored.
0600: if (failureInfo.getLogId() > 0) {
0601: if (logger.isInfoEnabled())
0602: logger
0603: .info("No status information received for request "
0604: + request
0605: + ", considering status as failed.");
0606:
0607: // If the current request is a rollback / abort, we need to call
0608: // logRequestCompletionAndNotifyScheduler with success set to true
0609: // for the transaction to be correctly cleaned up
0610: boolean isAbortOrRollback = (request instanceof UnknownWriteRequest)
0611: && ("rollback".equals(request
0612: .getSqlOrTemplate()) || "abort"
0613: .equals(request
0614: .getSqlOrTemplate()));
0615:
0616: logRequestCompletionAndNotifyScheduler(request,
0617: isAbortOrRollback, failureInfo, -1);
0618: }
0619: iter.remove();
0620: }
0621: }
0622: }
0623: }
0624:
0625: /**
0626: * Notify completion of a request that either failed on all backends or was
0627: * not executed at all (NoMoreBackendException). If completion was successful
0628: * and query failed on all backends locally, all local backends are disabled
0629: * if disabledBackendOnSuccess is true.
0630: *
0631: * @param request request that completed
0632: * @param success true if completion is successful
0633: * @param disableBackendOnSuccess disable all local backends if query was
0634: * successful (but failed locally)
0635: * @param updateCount request update count to be logged in recovery log
0636: * @see #addFailedOnAllBackends(AbstractRequest, boolean)
0637: */
0638: public void completeFailedOnAllBackends(AbstractRequest request,
0639: boolean success, boolean disableBackendOnSuccess,
0640: int updateCount) {
0641: FailureInformation failureInfo;
0642: synchronized (failedOnAllBackends) {
0643: failureInfo = (FailureInformation) failedOnAllBackends
0644: .remove(request);
0645: if (failureInfo == null) {
0646: /*
0647: * If we can't find failureInformation, assume the remote controller
0648: * failed the request before it completed locally. This is probably due
0649: * to a timeout.
0650: */
0651: failureInfo = new FailureInformation(success,
0652: disableBackendOnSuccess, updateCount);
0653: failedOnAllBackends.put(request, failureInfo);
0654:
0655: logger
0656: .info("Unable to find request "
0657: + request.getSqlShortForm(dvdb
0658: .getSqlShortFormLength())
0659: + " in list of requests that failed on all backends.");
0660: return;
0661: }
0662: }
0663: logRequestCompletionAndNotifyScheduler(request, success,
0664: failureInfo, updateCount);
0665:
0666: if (disableBackendOnSuccess && success) {
0667: // Now really disable the backends
0668: String message = "Request "
0669: + request.getSqlShortForm(dvdb
0670: .getSqlShortFormLength())
0671: + " failed on all local backends but succeeded on other controllers. Disabling all local backends.";
0672: logger.error(message);
0673: endUserLogger.error(message);
0674:
0675: try {
0676: dvdb.disableAllBackends(true);
0677: } catch (VirtualDatabaseException e) {
0678: logger
0679: .error(
0680: "An error occured while disabling all backends",
0681: e);
0682: }
0683: }
0684: }
0685:
0686: /**
0687: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#closePersistentConnection(java.lang.String,
0688: * long)
0689: */
0690: public void distributedClosePersistentConnection(String login,
0691: long persistentConnectionId) {
0692: List groupMembers = dvdb.getCurrentGroup().getMembers();
0693:
0694: if (logger.isDebugEnabled())
0695: logger.debug("Broadcasting closing persistent connection "
0696: + persistentConnectionId + " for user " + login
0697: + " to all controllers ("
0698: + dvdb.getChannel().getLocalMembership() + "->"
0699: + groupMembers.toString() + ")");
0700:
0701: // Send the query to everybody including us
0702: try {
0703: dvdb.getMulticastRequestAdapter().multicastMessage(
0704: groupMembers,
0705: new DistributedClosePersistentConnection(login,
0706: persistentConnectionId),
0707: MulticastRequestAdapter.WAIT_ALL, 0);
0708: } catch (Exception e) {
0709: String msg = "An error occured while executing distributed persistent connection "
0710: + persistentConnectionId + " closing";
0711: logger.warn(msg, e);
0712: }
0713:
0714: if (logger.isDebugEnabled())
0715: logger.debug("Persistent connection "
0716: + persistentConnectionId + " closed.");
0717: }
0718:
0719: /**
0720: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#openPersistentConnection(String,
0721: * long)
0722: */
0723: public void distributedOpenPersistentConnection(String login,
0724: long persistentConnectionId) throws SQLException {
0725: List groupMembers = dvdb.getCurrentGroup().getMembers();
0726:
0727: if (logger.isDebugEnabled())
0728: logger.debug("Broadcasting opening persistent connection "
0729: + persistentConnectionId + " for user " + login
0730: + " to all controllers ("
0731: + dvdb.getChannel().getLocalMembership() + "->"
0732: + groupMembers.toString() + ")");
0733:
0734: boolean success = false;
0735: Exception exception = null;
0736: try {
0737: // Send the query to everybody including us
0738: MulticastResponse responses = dvdb
0739: .getMulticastRequestAdapter().multicastMessage(
0740: groupMembers,
0741: new DistributedOpenPersistentConnection(
0742: login, persistentConnectionId),
0743: MulticastRequestAdapter.WAIT_ALL, 0);
0744:
0745: // get a list that won't change while we go through it
0746: groupMembers = dvdb.getAllMembers();
0747: int size = groupMembers.size();
0748: ArrayList failedControllers = null;
0749: // Get the result of each controller
0750: for (int i = 0; i < size; i++) {
0751: Member member = (Member) groupMembers.get(i);
0752: if ((responses.getFailedMembers() != null)
0753: && responses.getFailedMembers()
0754: .contains(member)) {
0755: logger.warn("Controller " + member
0756: + " is suspected of failure.");
0757: continue;
0758: }
0759: Object r = responses.getResult(member);
0760: if (r instanceof Boolean) {
0761: if (((Boolean) r).booleanValue())
0762: success = true;
0763: else
0764: logger
0765: .error("Unexpected result for controller "
0766: + member);
0767: } else if (r instanceof Exception) {
0768: if (failedControllers == null)
0769: failedControllers = new ArrayList();
0770: failedControllers.add(member);
0771: if (exception == null)
0772: exception = (Exception) r;
0773: if (logger.isDebugEnabled())
0774: logger
0775: .debug("Controller "
0776: + member
0777: + " failed to open persistent connection "
0778: + persistentConnectionId + " ("
0779: + r + ")");
0780: }
0781: }
0782:
0783: /*
0784: * Notify all controllers where all backend failed (if any) that
0785: * completion was 'success'.
0786: */
0787: if (failedControllers != null) {
0788: UnknownWriteRequest notifRequest = new UnknownWriteRequest(
0789: "open " + persistentConnectionId, false, 0,
0790: null);
0791: notifyRequestCompletion(notifRequest, success, false,
0792: failedControllers);
0793: }
0794: } catch (Exception e) {
0795: String msg = "An error occured while executing distributed persistent connection "
0796: + persistentConnectionId + " opening";
0797: logger.warn(msg, e);
0798: }
0799:
0800: if (success) {
0801: if (logger.isDebugEnabled())
0802: logger.debug("Persistent connection "
0803: + persistentConnectionId + " opened.");
0804: return; // This is a success if at least one controller has succeeded
0805: }
0806:
0807: // At this point, all controllers failed
0808: String msg = "Failed to open persistent connection "
0809: + persistentConnectionId + " on all controllers ("
0810: + exception + ")";
0811: logger.warn(msg);
0812: throw new SQLException(msg);
0813: }
0814:
0815: /**
0816: * Notify to all members the failover for the specified persistent connection.
0817: *
0818: * @param persistentConnectionId persistent connection id
0819: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForPersistentConnection(long)
0820: */
0821: public void distributedFailoverForPersistentConnection(
0822: long persistentConnectionId) {
0823: List groupMembers = dvdb.getCurrentGroup().getMembers();
0824:
0825: if (logger.isDebugEnabled())
0826: logger
0827: .debug("Broadcasting failover for persistent connection "
0828: + persistentConnectionId
0829: + " to all controllers ("
0830: + dvdb.getChannel().getLocalMembership()
0831: + "->" + groupMembers.toString() + ")");
0832:
0833: // Send the query to everybody including us
0834: try {
0835: dvdb.getMulticastRequestAdapter().multicastMessage(
0836: groupMembers,
0837: new FailoverForPersistentConnection(
0838: persistentConnectionId),
0839: MulticastRequestAdapter.WAIT_ALL, 0);
0840: } catch (Exception e) {
0841: String msg = "An error occured while notifying distributed persistent connection "
0842: + persistentConnectionId + " failover";
0843: logger.warn(msg, e);
0844: }
0845: }
0846:
0847: /**
0848: * Notify to all members the failover for the specified transaction.
0849: *
0850: * @param currentTid transaction id
0851: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForTransaction(long)
0852: */
0853: public void distributedFailoverForTransaction(long currentTid) {
0854: List groupMembers = dvdb.getCurrentGroup().getMembers();
0855:
0856: if (logger.isDebugEnabled())
0857: logger.debug("Broadcasting failover for transaction "
0858: + currentTid + " to all controllers ("
0859: + dvdb.getChannel().getLocalMembership() + "->"
0860: + groupMembers.toString() + ")");
0861:
0862: // Send the query to everybody including us
0863: try {
0864: dvdb.getMulticastRequestAdapter().multicastMessage(
0865: groupMembers,
0866: new FailoverForTransaction(currentTid),
0867: MulticastRequestAdapter.WAIT_ALL, 0);
0868: } catch (Exception e) {
0869: String msg = "An error occured while notifying distributed persistent connection "
0870: + currentTid + " failover";
0871: logger.warn(msg, e);
0872: }
0873: }
0874:
0875: //
0876: // Transaction management
0877: //
0878:
0879: /**
0880: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#abort(long,
0881: * boolean, boolean)
0882: */
0883: public void abort(long transactionId, boolean logAbort,
0884: boolean forceAbort) throws SQLException {
0885: Long lTid = new Long(transactionId);
0886: TransactionMetaData tm;
0887: try {
0888: tm = getTransactionMetaData(lTid);
0889: if (!forceAbort && tidSavepoints.get(lTid) != null) {
0890: if (logger.isDebugEnabled())
0891: logger
0892: .debug("Transaction "
0893: + transactionId
0894: + " has savepoints, transaction will not be aborted");
0895: return;
0896: }
0897: } catch (SQLException e1) {
0898: logger
0899: .warn("No transaction metadata found to abort transaction "
0900: + transactionId
0901: + ". Creating a fake context for abort.");
0902: // We ignore the persistent connection id here (retrieved by connection
0903: // manager)
0904: tm = new TransactionMetaData(transactionId, 0,
0905: RecoveryLog.UNKNOWN_USER, false, 0);
0906: if (tidSavepoints.get(lTid) != null) {
0907: if (logger.isDebugEnabled())
0908: logger
0909: .debug("Transaction "
0910: + transactionId
0911: + " has savepoints, transaction will not be aborted");
0912: return;
0913: }
0914: }
0915:
0916: boolean isAWriteTransaction;
0917: synchronized (distributedTransactions) {
0918: isAWriteTransaction = distributedTransactions
0919: .contains(lTid);
0920: }
0921: if (isAWriteTransaction) {
0922: distributedAbort(tm.getLogin(), transactionId);
0923: } else {
0924: // read-only transaction, it is local but we still have to post the query
0925: // in the total order queue. Note that we post a Rollback object because
0926: // the load balancer will treat the abort as a rollback.
0927: LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
0928: synchronized (totalOrderQueue) {
0929: totalOrderQueue.addLast(new DistributedRollback(tm
0930: .getLogin(), transactionId));
0931: }
0932: super .abort(transactionId, logAbort, forceAbort);
0933: }
0934: }
0935:
0936: /**
0937: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#begin(String,
0938: * boolean, long) overrides RequestManager.begin(String) to apply bit
0939: * masks to the tid returned by the scheduler
0940: */
0941: public long begin(String login, boolean isPersistentConnection,
0942: long persistentConnectionId) throws SQLException {
0943: long tid = scheduler.getNextTransactionId();
0944: // 2 first bytes are used for controller id
0945: // 6 right-most bytes are used for transaction id
0946: tid = tid & TRANSACTION_ID_BIT_MASK;
0947: tid = tid | controllerId;
0948: doBegin(login, tid, isPersistentConnection,
0949: persistentConnectionId);
0950: return tid;
0951: }
0952:
0953: /**
0954: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#commit(long,
0955: * boolean, boolean)
0956: */
0957: public void commit(long transactionId, boolean logCommit,
0958: boolean emptyTransaction) throws SQLException {
0959: Long lTid = new Long(transactionId);
0960: TransactionMetaData tm = getTransactionMetaData(lTid);
0961: boolean isAWriteTransaction;
0962: synchronized (distributedTransactions) {
0963: isAWriteTransaction = distributedTransactions
0964: .contains(lTid);
0965: }
0966: if (isAWriteTransaction) {
0967: distributedCommit(tm.getLogin(), transactionId);
0968: } else {
0969: // read-only transaction, it is local
0970: DistributedCommit commit = new DistributedCommit(tm
0971: .getLogin(), transactionId);
0972: if (!emptyTransaction) {
0973: LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
0974: synchronized (totalOrderQueue) {
0975: totalOrderQueue.addLast(commit);
0976: }
0977: }
0978: try {
0979: super
0980: .commit(transactionId, logCommit,
0981: emptyTransaction);
0982: } catch (SQLException e) {
0983: if (logger.isWarnEnabled()) {
0984: logger
0985: .warn("Ignoring failure of commit for read-only transaction, exception was: "
0986: + e);
0987: }
0988:
0989: // Force transaction completion on scheduler
0990: scheduler.commit(tm, emptyTransaction, commit);
0991: scheduler.commitCompleted(tm, true);
0992:
0993: // Clean-up transactional context
0994: completeTransaction(lTid);
0995: }
0996: }
0997: }
0998:
0999: /**
1000: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#completeTransaction(java.lang.Long)
1001: */
1002: public void completeTransaction(Long tid) {
1003: synchronized (distributedTransactions) {
1004: distributedTransactions.remove(tid);
1005: }
1006: super .completeTransaction(tid);
1007: }
1008:
1009: /**
1010: * Check if the transaction corresponding to the given query has been started
1011: * remotely and start the transaction locally in a lazy manner if needed. This
1012: * also checks if a local request must trigger the logging of a 'begin' in the
1013: * recovery log.
1014: *
1015: * @param request query to execute
1016: * @throws SQLException if an error occurs
1017: */
1018: public void lazyTransactionStart(AbstractRequest request)
1019: throws SQLException {
1020: // Check if this is a remotely started transaction that we need to lazily
1021: // start locally. Note that we cannot decide from its id that a transaction
1022: // has been started remotely. In a failover case the client still uses the
1023: // transaction id given by the original controller. See SEQUOIA-930.
1024: if (!request.isAutoCommit()) {
1025: long tid = request.getTransactionId();
1026: Long lTid = new Long(tid);
1027: TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1028: .get(lTid);
1029:
1030: // Check if transaction is started
1031: if (tm != null) {
1032: /*
1033: * It may have been started by a failover before any writes were
1034: * executed.
1035: */
1036: if (tm.isReadOnly()) {
1037: request.setIsLazyTransactionStart(true);
1038: tm.setReadOnly(false);
1039: }
1040: return; // transaction already started
1041: }
1042: // Begin this transaction
1043: try {
1044: tm = new TransactionMetaData(tid, beginTimeout, request
1045: .getLogin(), request.isPersistentConnection(),
1046: request.getPersistentConnectionId());
1047: tm.setReadOnly(false);
1048:
1049: if (logger.isDebugEnabled())
1050: logger.debug(Translate.get(
1051: "transaction.begin.lazy", String
1052: .valueOf(tid)));
1053:
1054: scheduler.begin(tm, true, request);
1055:
1056: try {
1057: // Send to load balancer
1058: loadBalancer.begin(tm);
1059:
1060: // We need to update the tid table first so that
1061: // logLazyTransactionBegin can retrieve the metadata
1062: transactionMetaDatas.put(lTid, tm);
1063: request.setIsLazyTransactionStart(true);
1064:
1065: synchronized (distributedTransactions) {
1066: if (!distributedTransactions.contains(lTid))
1067: distributedTransactions.add(lTid);
1068: }
1069: } catch (SQLException e) {
1070: if (recoveryLog != null)
1071: // In case logLazyTransactionBegin failed
1072: transactionMetaDatas.remove(lTid);
1073: throw e;
1074: } finally {
1075: // Notify scheduler for completion in any case
1076: scheduler.beginCompleted(tid);
1077: }
1078: } catch (RuntimeException e) {
1079: String msg = Translate
1080: .get("fatal.runtime.exception.requestmanager.begin");
1081: logger.fatal(msg, e);
1082: endUserLogger.fatal(msg);
1083: throw new SQLException(e.getMessage());
1084: }
1085: }
1086: }
1087:
1088: /**
1089: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#rollback(long,
1090: * boolean)
1091: */
1092: public void rollback(long transactionId, boolean logRollback)
1093: throws SQLException {
1094: Long lTid = new Long(transactionId);
1095: TransactionMetaData tm = getTransactionMetaData(lTid);
1096: boolean isAWriteTransaction;
1097: synchronized (distributedTransactions) {
1098: isAWriteTransaction = distributedTransactions
1099: .contains(lTid);
1100: }
1101: if (isAWriteTransaction) {
1102: distributedRollback(tm.getLogin(), transactionId);
1103: } else {
1104: // read-only transaction, it is local
1105: DistributedRollback rollback = new DistributedRollback(tm
1106: .getLogin(), transactionId);
1107: LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
1108: synchronized (totalOrderQueue) {
1109: totalOrderQueue.addLast(rollback);
1110: }
1111: try {
1112: super .rollback(transactionId, logRollback);
1113: } catch (SQLException e) {
1114: if (logger.isWarnEnabled()) {
1115: logger
1116: .warn("Ignoring failure of rollback for read-only transaction, exception was: "
1117: + e);
1118: }
1119:
1120: // Force transaction completion on scheduler
1121: try {
1122: scheduler.rollback(tm, rollback);
1123: } catch (SQLException ignore) {
1124: }
1125: scheduler.rollbackCompleted(tm, true);
1126:
1127: // Clean-up transactional context
1128: completeTransaction(lTid);
1129: }
1130: }
1131: }
1132:
1133: /**
1134: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#rollback(long,
1135: * String)
1136: */
1137: public void rollback(long transactionId, String savepointName)
1138: throws SQLException {
1139: Long lTid = new Long(transactionId);
1140: boolean isAWriteTransaction;
1141: synchronized (distributedTransactions) {
1142: isAWriteTransaction = distributedTransactions
1143: .contains(lTid);
1144: }
1145: if (isAWriteTransaction) {
1146: TransactionMetaData tm = getTransactionMetaData(lTid);
1147: distributedRollback(tm.getLogin(), transactionId,
1148: savepointName);
1149: } else { // read-only transaction, it is local
1150: LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
1151: synchronized (totalOrderQueue) {
1152: totalOrderQueue
1153: .addLast(new DistributedRollbackToSavepoint(
1154: transactionId, savepointName));
1155: }
1156: super .rollback(transactionId, savepointName);
1157: }
1158: }
1159:
1160: /**
1161: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setSavepoint(long)
1162: */
1163: public int setSavepoint(long transactionId) throws SQLException {
1164: Long lTid = new Long(transactionId);
1165: int savepointId = scheduler.incrementSavepointId();
1166: TransactionMetaData tm = getTransactionMetaData(lTid);
1167: synchronized (distributedTransactions) {
1168: if (!distributedTransactions.contains(lTid))
1169: distributedTransactions.add(lTid);
1170: }
1171: distributedSetSavepoint(tm.getLogin(), transactionId, String
1172: .valueOf(savepointId));
1173: return savepointId;
1174: }
1175:
1176: /**
1177: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setSavepoint(long,
1178: * String)
1179: */
1180: public void setSavepoint(long transactionId, String name)
1181: throws SQLException {
1182: Long lTid = new Long(transactionId);
1183: TransactionMetaData tm = getTransactionMetaData(lTid);
1184: synchronized (distributedTransactions) {
1185: if (!distributedTransactions.contains(lTid))
1186: distributedTransactions.add(lTid);
1187: }
1188: distributedSetSavepoint(tm.getLogin(), transactionId, name);
1189: }
1190:
1191: /**
1192: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#releaseSavepoint(long,
1193: * String)
1194: */
1195: public void releaseSavepoint(long transactionId, String name)
1196: throws SQLException {
1197: Long lTid = new Long(transactionId);
1198: boolean isAWriteTransaction;
1199: synchronized (distributedTransactions) {
1200: isAWriteTransaction = distributedTransactions
1201: .contains(lTid);
1202: }
1203: if (isAWriteTransaction) {
1204: TransactionMetaData tm = getTransactionMetaData(lTid);
1205: distributedReleaseSavepoint(tm.getLogin(), transactionId,
1206: name);
1207: } else {
1208: // read-only transaction, it is local
1209: LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
1210: synchronized (totalOrderQueue) {
1211: totalOrderQueue
1212: .addLast(new DistributedReleaseSavepoint(
1213: transactionId, name));
1214: }
1215: super .releaseSavepoint(transactionId, name);
1216: }
1217: }
1218:
1219: /**
1220: * Add this transaction to the list of write transactions that needs to be
1221: * globally commited. This happens if the transaction has only be started
1222: * locally but not through a lazy start.
1223: */
1224: private void addToDistributedTransactionListIfNeeded(
1225: AbstractRequest request) {
1226: // Add to distributed transactions if needed
1227: if (!request.isAutoCommit()) {
1228: Long lTid = new Long(request.getTransactionId());
1229: synchronized (distributedTransactions) {
1230: if (!distributedTransactions.contains(lTid))
1231: distributedTransactions.add(lTid);
1232: }
1233: }
1234: }
1235:
1236: /**
1237: * Retrieve the vLogin corresponding to the persistent connection id provided
1238: * and close the connection if found. This is used by the
1239: * ControllerFailureCleanupThread to cleanup reamining persistent connections
1240: * from a failed controller whose clients never recovered.
1241: *
1242: * @param connectionId the persistent connection id
1243: */
1244: public void closePersistentConnection(Long connectionId) {
1245: String vLogin = scheduler
1246: .getPersistentConnectionLogin(connectionId);
1247: if (vLogin != null)
1248: super .closePersistentConnection(vLogin, connectionId
1249: .longValue());
1250: }
1251:
1252: /**
1253: * Performs a local read operation, as opposed to execReadRequest() which
1254: * attempts to use distributed reads when there is NoMoreBackendException.
1255: *
1256: * @param request the read request to perform
1257: * @return a ControllerResultSet
1258: * @throws NoMoreBackendException when no more local backends are available to
1259: * execute the request
1260: * @throws SQLException in case of error
1261: */
1262: public ControllerResultSet execLocalStatementExecuteQuery(
1263: SelectRequest request) throws NoMoreBackendException,
1264: SQLException {
1265: return super .statementExecuteQuery(request);
1266: }
1267:
1268: /**
1269: * Execute a read request on some remote controller - one in the group. Used
1270: * when the local controller has no backend available to execute the request.
1271: *
1272: * @param request the request to execute
1273: * @return the query ResultSet
1274: * @throws SQLException in case of bad request
1275: */
1276: public abstract ControllerResultSet execRemoteStatementExecuteQuery(
1277: SelectRequest request) throws SQLException;
1278:
1279: /**
1280: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
1281: */
1282: public ControllerResultSet statementExecuteQuery(
1283: SelectRequest request) throws SQLException {
1284: if (!request.isMustBroadcast()) {
1285: try {
1286: return execLocalStatementExecuteQuery(request);
1287: } catch (SQLException e) {
1288: if (!(e instanceof NoMoreBackendException))
1289: throw e;
1290: // else this failed locally, try it remotely
1291: // Request failed locally, try on other controllers.
1292: addToDistributedTransactionListIfNeeded(request);
1293: return execRemoteStatementExecuteQuery(request);
1294: }
1295: }
1296: addToDistributedTransactionListIfNeeded(request);
1297: return distributedStatementExecuteQuery(request);
1298: }
1299:
1300: /**
1301: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteUpdate(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1302: */
1303: public ExecuteUpdateResult statementExecuteUpdate(
1304: AbstractWriteRequest request) throws SQLException {
1305: if (!request.isAutoCommit()) { /*
1306: * Add this transaction to the list of write transactions that needs to be
1307: * globally commited. This happens if the transaction has only be started
1308: * locally but not through a lazy start.
1309: */
1310: Long lTid = new Long(request.getTransactionId());
1311: synchronized (distributedTransactions) {
1312: if (!distributedTransactions.contains(lTid))
1313: distributedTransactions.add(lTid);
1314: }
1315: }
1316: return distributedStatementExecuteUpdate(request);
1317: }
1318:
1319: /**
1320: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteUpdateWithKeys(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1321: */
1322: public GeneratedKeysResult statementExecuteUpdateWithKeys(
1323: AbstractWriteRequest request) throws SQLException {
1324: if (!request.isAutoCommit()) { /*
1325: * Add this transaction to the list of write transactions that needs to be
1326: * globally commited. This happens if the transaction has only be started
1327: * locally but not through a lazy start.
1328: */
1329: Long lTid = new Long(request.getTransactionId());
1330: synchronized (distributedTransactions) {
1331: if (!distributedTransactions.contains(lTid))
1332: distributedTransactions.add(lTid);
1333: }
1334: }
1335: return distributedStatementExecuteUpdateWithKeys(request);
1336: }
1337:
1338: /**
1339: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecute(AbstractRequest)
1340: */
1341: public ExecuteResult statementExecute(AbstractRequest request)
1342: throws SQLException {
1343: if (!request.isAutoCommit()) { /*
1344: * Add this transaction to the list of write transactions that needs to be
1345: * globally commited. This happens if the transaction has only be started
1346: * locally but not through a lazy start.
1347: */
1348: Long lTid = new Long(request.getTransactionId());
1349: synchronized (distributedTransactions) {
1350: if (!distributedTransactions.contains(lTid))
1351: distributedTransactions.add(lTid);
1352: }
1353: }
1354: return distributedStatementExecute(request);
1355: }
1356:
1357: /**
1358: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#scheduleExecWriteRequest(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1359: */
1360: public void scheduleExecWriteRequest(AbstractWriteRequest request)
1361: throws SQLException {
1362: lazyTransactionStart(request);
1363: super .scheduleExecWriteRequest(request);
1364: }
1365:
1366: /**
1367: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecuteQuery(StoredProcedure)
1368: */
1369: public ControllerResultSet callableStatementExecuteQuery(
1370: StoredProcedure proc) throws SQLException {
1371: // Parse the query first to update the semantic information
1372: getParsingFromCacheOrParse(proc);
1373:
1374: // If procedure is read-only, we don't broadcast
1375: DatabaseProcedureSemantic semantic = proc.getSemantic();
1376: if (proc.isReadOnly()
1377: || ((semantic != null) && (semantic.isReadOnly()))) {
1378: try {
1379: proc.setIsReadOnly(true);
1380: return execLocallyCallableStatementExecuteQuery(proc);
1381: } catch (AllBackendsFailedException ignore) {
1382: // This failed locally, try it remotely
1383: } catch (SQLException e) {
1384: if (!(e instanceof NoMoreBackendException))
1385: throw e;
1386: // else this failed locally, try it remotely
1387: }
1388: }
1389:
1390: addToDistributedTransactionListIfNeeded(proc);
1391: return distributedCallableStatementExecuteQuery(proc);
1392: }
1393:
1394: /**
1395: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
1396: */
1397: public ExecuteUpdateResult callableStatementExecuteUpdate(
1398: StoredProcedure proc) throws SQLException {
1399: if (!proc.isAutoCommit()) { /*
1400: * Add this transaction to the list of write transactions that needs to be
1401: * globally commited. This happens if the transaction has only be started
1402: * locally but not through a lazy start.
1403: */
1404: Long lTid = new Long(proc.getTransactionId());
1405: synchronized (distributedTransactions) {
1406: if (!distributedTransactions.contains(lTid))
1407: distributedTransactions.add(lTid);
1408: }
1409: }
1410: return distributedCallableStatementExecuteUpdate(proc);
1411: }
1412:
1413: /**
1414: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecute(StoredProcedure)
1415: */
1416: public ExecuteResult callableStatementExecute(StoredProcedure proc)
1417: throws SQLException {
1418: // Parse the query first to update the semantic information
1419: getParsingFromCacheOrParse(proc);
1420:
1421: // If procedure is read-only, we don't broadcast
1422: DatabaseProcedureSemantic semantic = proc.getSemantic();
1423: if (proc.isReadOnly()
1424: || ((semantic != null) && (semantic.isReadOnly()))) {
1425: try {
1426: proc.setIsReadOnly(true);
1427: return execLocallyCallableStatementExecute(proc);
1428: } catch (AllBackendsFailedException ignore) {
1429: // This failed locally, try it remotely
1430: } catch (SQLException e) {
1431: if (!(e instanceof NoMoreBackendException))
1432: throw e;
1433: // else this failed locally, try it remotely
1434: }
1435: }
1436:
1437: if (!proc.isAutoCommit()) { /*
1438: * Add this transaction to the list of write transactions that needs to be
1439: * globally commited. This happens if the transaction has only be started
1440: * locally but not through a lazy start.
1441: */
1442: Long lTid = new Long(proc.getTransactionId());
1443: synchronized (distributedTransactions) {
1444: if (!distributedTransactions.contains(lTid))
1445: distributedTransactions.add(lTid);
1446: }
1447: }
1448: return distributedCallableStatementExecute(proc);
1449: }
1450:
1451: /**
1452: * Fetch the result of a previously executed request from a remote controller
1453: * failover cache.
1454: *
1455: * @param successfulControllers controllers to fetch the result from
1456: * @param id unique identifier of the query to look the result for
1457: * @return the request result
1458: * @throws NoResultAvailableException if no result could be retrieved from the
1459: * failover cache
1460: */
1461: protected Serializable getRequestResultFromFailoverCache(
1462: List successfulControllers, long id)
1463: throws NoResultAvailableException {
1464: List groupMembers = new ArrayList(1);
1465:
1466: // Try all members in turn and return as soon as one succeeds
1467: for (Iterator iter = successfulControllers.iterator(); iter
1468: .hasNext();) {
1469: Member remoteController = (Member) iter.next();
1470: groupMembers.clear();
1471: groupMembers.add(remoteController);
1472:
1473: if (logger.isDebugEnabled())
1474: logger.debug("Getting result for request " + id
1475: + " from controllers " + remoteController);
1476:
1477: try { // Send the request to that controller
1478: MulticastResponse response = dvdb
1479: .getMulticastRequestAdapter().multicastMessage(
1480: groupMembers,
1481: new GetRequestResultFromFailoverCache(
1482: id),
1483: MulticastRequestAdapter.WAIT_ALL, 0);
1484: Serializable result = response
1485: .getResult(remoteController);
1486:
1487: if ((result instanceof Exception)
1488: || (response.getFailedMembers() != null)) { // Failure on the remote controller
1489: if (logger.isInfoEnabled())
1490: logger
1491: .info(
1492: "Controller "
1493: + remoteController
1494: + " could not fetch result for request "
1495: + id,
1496: (Exception) result);
1497: } else
1498: return result;
1499: } catch (Exception e) {
1500: String msg = "An error occured while getching result for request "
1501: + id + " from controller " + remoteController;
1502: logger.warn(msg, e);
1503: }
1504: }
1505: throw new NoResultAvailableException(
1506: "All controllers failed when trying to fetch result for request "
1507: + id);
1508: }
1509:
1510: /**
1511: * Stores a result associated with a request in the request result failover
1512: * cache.
1513: * <p>
1514: * Only results for requests initiated on a remote controller are stored.
1515: *
1516: * @param request the request executed
1517: * @param result the result of the request
1518: * @return true if the result was added to the cache, false if the request was
1519: * local to this controller
1520: * @see org.continuent.sequoia.controller.virtualdatabase.RequestResultFailoverCache#store(AbstractRequest,
1521: * Serializable)
1522: */
1523: public boolean storeRequestResult(AbstractRequest request,
1524: Serializable result) {
1525: // Cache only results for requests initiated by a remote controller.
1526: if ((request.getId() & CONTROLLER_ID_BIT_MASK) != dvdb
1527: .getControllerId()) {
1528: dvdb.getRequestResultFailoverCache().store(request, result);
1529: return true;
1530: }
1531: return false;
1532: }
1533:
1534: //
1535: // RAIDb level specific methods
1536: //
1537:
1538: /**
1539: * Distributed implementation of an abort
1540: *
1541: * @param login login that abort the transaction
1542: * @param transactionId id of the commiting transaction
1543: * @throws SQLException if an error occurs
1544: */
1545: public abstract void distributedAbort(String login,
1546: long transactionId) throws SQLException;
1547:
1548: /**
1549: * Distributed implementation of a commit
1550: *
1551: * @param login login that commit the transaction
1552: * @param transactionId id of the commiting transaction
1553: * @throws SQLException if an error occurs
1554: */
1555: public abstract void distributedCommit(String login,
1556: long transactionId) throws SQLException;
1557:
1558: /**
1559: * Distributed implementation of a rollback
1560: *
1561: * @param login login that rollback the transaction
1562: * @param transactionId id of the rollbacking transaction
1563: * @throws SQLException if an error occurs
1564: */
1565: public abstract void distributedRollback(String login,
1566: long transactionId) throws SQLException;
1567:
1568: /**
1569: * Distributed implementation of a rollback to a savepoint
1570: *
1571: * @param login login that rollback the transaction
1572: * @param transactionId id of the transaction
1573: * @param savepointName name of the savepoint
1574: * @throws SQLException if an error occurs
1575: */
1576: public abstract void distributedRollback(String login,
1577: long transactionId, String savepointName)
1578: throws SQLException;
1579:
1580: /**
1581: * Distributed implementation of setting a savepoint to a transaction
1582: *
1583: * @param login login that releases the savepoint
1584: * @param transactionId id of the transaction
1585: * @param name name of the savepoint to set
1586: * @throws SQLException if an error occurs
1587: */
1588: public abstract void distributedSetSavepoint(String login,
1589: long transactionId, String name) throws SQLException;
1590:
1591: /**
1592: * Distributed implementation of releasing a savepoint from a transaction
1593: *
1594: * @param login login that set the savepoint
1595: * @param transactionId id of the transaction
1596: * @param name name of the savepoint to release
1597: * @throws SQLException if an error occurs
1598: */
1599: public abstract void distributedReleaseSavepoint(String login,
1600: long transactionId, String name) throws SQLException;
1601:
1602: /**
1603: * Distributed implementation of a select request execution that returns a
1604: * ResultSet.
1605: *
1606: * @param request request to execute
1607: * @return ResultSet containing the auto-generated keys.
1608: * @throws SQLException if an error occurs
1609: */
1610: public abstract ControllerResultSet distributedStatementExecuteQuery(
1611: SelectRequest request) throws SQLException;
1612:
1613: /**
1614: * Distributed implementation of a write request execution.
1615: *
1616: * @param request request to execute
1617: * @return number of modified rows
1618: * @throws SQLException if an error occurs
1619: */
1620: public abstract ExecuteUpdateResult distributedStatementExecuteUpdate(
1621: AbstractWriteRequest request) throws SQLException;
1622:
1623: /**
1624: * Distributed implementation of a write request execution that returns
1625: * auto-generated keys.
1626: *
1627: * @param request request to execute
1628: * @return update count and ResultSet containing the auto-generated keys.
1629: * @throws SQLException if an error occurs
1630: */
1631: public abstract GeneratedKeysResult distributedStatementExecuteUpdateWithKeys(
1632: AbstractWriteRequest request) throws SQLException;
1633:
1634: /**
1635: * Distributed implementation of a Statement.execute() execution.
1636: *
1637: * @param request request to execute
1638: * @return an <code>ExecuteResult</code> object
1639: * @throws SQLException if an error occurs
1640: */
1641: public abstract ExecuteResult distributedStatementExecute(
1642: AbstractRequest request) throws SQLException;
1643:
1644: /**
1645: * Distributed implementation of a stored procedure
1646: * CallableStatement.executeQuery() execution.
1647: *
1648: * @param proc stored procedure to execute
1649: * @return ResultSet corresponding to this stored procedure execution
1650: * @throws SQLException if an error occurs
1651: */
1652: public abstract ControllerResultSet distributedCallableStatementExecuteQuery(
1653: StoredProcedure proc) throws SQLException;
1654:
1655: /**
1656: * Distributed implementation of a stored procedure
1657: * CallableStatement.executeUpdate() execution.
1658: *
1659: * @param proc stored procedure to execute
1660: * @return number of modified rows
1661: * @throws SQLException if an error occurs
1662: */
1663: public abstract ExecuteUpdateResult distributedCallableStatementExecuteUpdate(
1664: StoredProcedure proc) throws SQLException;
1665:
1666: /**
1667: * Distributed implementation of a stored procedure
1668: * CallableStatement.execute() execution.
1669: *
1670: * @param proc stored procedure to execute
1671: * @return an <code>ExecuteResult</code> object
1672: * @throws SQLException if an error occurs
1673: */
1674: public abstract ExecuteResult distributedCallableStatementExecute(
1675: StoredProcedure proc) throws SQLException;
1676:
1677: /**
1678: * Once the request has been dispatched, it can be executed using the code
1679: * from <code>RequestManager</code>
1680: *
1681: * @param proc stored procedure to execute
1682: * @return ResultSet corresponding to this stored procedure execution
1683: * @throws AllBackendsFailedException if all backends failed to execute the
1684: * stored procedure
1685: * @throws SQLException if an error occurs
1686: */
1687: public ControllerResultSet execLocallyCallableStatementExecuteQuery(
1688: StoredProcedure proc) throws AllBackendsFailedException,
1689: SQLException {
1690: return super .callableStatementExecuteQuery(proc);
1691: }
1692:
1693: /**
1694: * Once the request has been dispatched, it can be executed using the code
1695: * from <code>RequestManager</code>
1696: *
1697: * @param proc stored procedure to execute
1698: * @return ExecuteResult corresponding to this stored procedure execution
1699: * @throws AllBackendsFailedException if all backends failed to execute the
1700: * stored procedure
1701: * @throws SQLException if an error occurs
1702: */
1703: public ExecuteResult execLocallyCallableStatementExecute(
1704: StoredProcedure proc) throws AllBackendsFailedException,
1705: SQLException {
1706: return super .callableStatementExecute(proc);
1707: }
1708:
1709: /**
1710: * Test if a transaction has been started on this controller, but initialized
1711: * by a remote controller.
1712: *
1713: * @param currentTid Current transaction Id
1714: * @return True if this transaction of Id currentId has already been started
1715: * on the current controller
1716: */
1717: public boolean isDistributedTransaction(long currentTid) {
1718: synchronized (distributedTransactions) {
1719: return distributedTransactions
1720: .contains(new Long(currentTid));
1721: }
1722: }
1723:
1724: /**
1725: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#resumeActivity()
1726: */
1727: public void resumeActivity() {
1728: // Perform the distributed call through the group-comm, in order to
1729: // resume all the activity suspended above.
1730: try {
1731: // Suspend transactions
1732: dvdb.getMulticastRequestAdapter().multicastMessage(
1733: dvdb.getAllMembers(),
1734: new ResumeActivity(),
1735: MulticastRequestAdapter.WAIT_ALL,
1736: dvdb.getMessageTimeouts()
1737: .getDisableBackendTimeout());
1738: logger.info("All activity is now resumed for "
1739: + dvdb.getDatabaseName());
1740: } catch (Exception e) {
1741: String msg = "Error while resuming activity";
1742: logger.error(msg, e);
1743: }
1744: }
1745:
1746: /**
1747: * @see org.continuent.sequoia.controller.requestmanager.RequestManager#suspendActivity()
1748: */
1749: public void suspendActivity() throws SQLException {
1750: // Perform the distributed call through the group-comm, in order to
1751: // atomically suspend all activity on the system.
1752: try {
1753: // Suspend transactions
1754: dvdb.getMulticastRequestAdapter().multicastMessage(
1755: dvdb.getAllMembers(),
1756: new SuspendActivity(),
1757: MulticastRequestAdapter.WAIT_ALL,
1758: dvdb.getMessageTimeouts()
1759: .getDisableBackendTimeout());
1760: logger.info("All activity is suspended for "
1761: + dvdb.getDatabaseName());
1762: } catch (Exception e) {
1763: String msg = "Error while suspending activity";
1764: logger.error(msg, e);
1765: throw (SQLException) new SQLException(msg + "(" + e + ")")
1766: .initCause(e);
1767: }
1768: }
1769:
1770: /**
1771: * Suspend all transactions, writes and persistent connections and
1772: * <strong>do not wait for completion of in-flight transactions
1773: * and/or persistent connections</strong>.
1774: * <p>
1775: * This method blocks activity in the cluster and should not be called in
1776: * normal cluster situation. It should be reserved for extraordinary
1777: * situation such as network partition detection & reconciliation.
1778: * </p>
1779: *
1780: * @throws SQLException if an error occured
1781: */
1782: public void blockActivity() throws SQLException {
1783: try {
1784: dvdb.getMulticastRequestAdapter().multicastMessage(
1785: dvdb.getAllMembers(),
1786: new BlockActivity(),
1787: MulticastRequestAdapter.WAIT_ALL,
1788: dvdb.getMessageTimeouts()
1789: .getDisableBackendTimeout());
1790: logger.info("All activity is blocked for "
1791: + dvdb.getDatabaseName());
1792: } catch (Exception e) {
1793: String msg = "Error while blocking activity";
1794: logger.error(msg, e);
1795: throw (SQLException) new SQLException(msg + "(" + e + ")")
1796: .initCause(e);
1797: }
1798: }
1799:
1800: /**
1801: * Notify controllers that they are now inconsistent with the cluster and that
1802: * they sould disable themselves.
1803: *
1804: * @param request request that generated the consistency
1805: * @param inconsistentControllers controllers that need to be notified
1806: * @throws SQLException if an error occurs
1807: */
1808: protected void notifyControllerInconsistency(
1809: AbstractRequest request, ArrayList inconsistentControllers)
1810: throws SQLException {
1811: try {
1812: dvdb.getMulticastRequestAdapter().multicastMessage(
1813: inconsistentControllers,
1814: new NotifyInconsistency(request),
1815: MulticastRequestAdapter.WAIT_ALL, 0);
1816: } catch (Exception e) {
1817: String msg = "An error occured while notifying controllers ("
1818: + inconsistentControllers
1819: + ") of inconsistency due to distributed request "
1820: + request.getId();
1821: logger.warn(msg, e);
1822: throw new SQLException(msg + " (" + e + ")");
1823: }
1824: }
1825:
1826: /**
1827: * Notify a set of backends of the query completion.
1828: *
1829: * @param request the request that has completed
1830: * @param success true if the request has successfully completed
1831: * @param disableBackendOnSuccess disable all local backends if query was
1832: * successful (but failed locally). Usually set to true in case of
1833: * AllBackendsFailedException and false for NoMoreBackendException.
1834: * @param backendsToNotify list of backends to notify (returns right away if
1835: * the list is null)
1836: * @throws SQLException if an error occurs
1837: */
1838: protected void notifyRequestCompletion(AbstractRequest request,
1839: boolean success, boolean disableBackendOnSuccess,
1840: ArrayList backendsToNotify) throws SQLException {
1841: if (backendsToNotify == null)
1842: return;
1843: try {
1844: dvdb.getMulticastRequestAdapter().multicastMessage(
1845: backendsToNotify,
1846: new NotifyCompletion(request, success,
1847: disableBackendOnSuccess),
1848: MulticastRequestAdapter.WAIT_ALL,
1849: dvdb.getMessageTimeouts()
1850: .getNotifyCompletionTimeout());
1851: } catch (Exception e) {
1852: String msg = "An error occured while notifying all controllers of failure of distributed request "
1853: + request.getId();
1854: logger.warn(msg, e);
1855: throw new SQLException(msg + " (" + e + ")");
1856: }
1857: }
1858:
1859: /**
1860: * Notify a set of backends of the query completion.
1861: *
1862: * @param request the request that has completed
1863: * @param success true if the request has successfully completed
1864: * @param disableBackendOnSuccess disable all local backends if query was
1865: * successful (but failed locally). Usually set to true in case of
1866: * AllBackendsFailedException and false for NoMoreBackendException.
1867: * @param backendsToNotify list of backends to notify (returns right away if
1868: * the list is null)
1869: * @param requestUpdateCount the request update count to be logged if it
1870: * succeeded somewhere
1871: * @throws SQLException
1872: */
1873: protected void notifyRequestCompletion(AbstractRequest request,
1874: boolean success, boolean disableBackendOnSuccess,
1875: ArrayList backendsToNotify, int requestUpdateCount)
1876: throws SQLException {
1877: if (backendsToNotify == null)
1878: return;
1879: try {
1880: dvdb.getMulticastRequestAdapter()
1881: .multicastMessage(
1882: backendsToNotify,
1883: new NotifyCompletion(request, success,
1884: disableBackendOnSuccess,
1885: requestUpdateCount),
1886: MulticastRequestAdapter.WAIT_ALL,
1887: dvdb.getMessageTimeouts()
1888: .getNotifyCompletionTimeout());
1889: } catch (Exception e) {
1890: String msg = "An error occured while notifying all controllers of failure of distributed request "
1891: + request.getId();
1892: logger.warn(msg, e);
1893: throw new SQLException(msg + " (" + e + ")");
1894: }
1895: }
1896:
1897: /**
1898: * Cleanup all queries from a given transaction that were registered as failed
1899: * and issued from the other controller. This is used when a controller has
1900: * failed and no status information will be returned for failed queries.
1901: * Queries are systematically tagged as failed. This method is called only for
1902: * failover during a rollback / abort to properly close the transaction on the
1903: * remaining controller.
1904: *
1905: * @param tId the transaction id that we are looking for
1906: */
1907: public void cleanupRollbackFromOtherController(long tId) {
1908: long cid = this .getControllerId();
1909: synchronized (failedOnAllBackends) {
1910: for (Iterator iter = failedOnAllBackends.keySet()
1911: .iterator(); iter.hasNext();) {
1912: AbstractRequest request = (AbstractRequest) iter.next();
1913: if (((request.getId() & CONTROLLER_ID_BIT_MASK) != cid)
1914: || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) != cid)
1915: && request.getTransactionId() == tId) { // Need to remove that entry
1916: if (logger.isInfoEnabled())
1917: logger
1918: .info("Failover while rollbacking the transaction "
1919: + tId
1920: + " detected. No status information received for request "
1921: + request
1922: + ", considering status as failed.");
1923: FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
1924: .get(request);
1925:
1926: // If the current request is a rollback / abort, we need to call
1927: // logRequestCompletionAndNotifyScheduler with success set to true
1928: // for the transaction to be correctly cleaned up
1929: boolean isAbortOrRollback = (request instanceof UnknownWriteRequest)
1930: && ("rollback".equals(request
1931: .getSqlOrTemplate()) || "abort"
1932: .equals(request.getSqlOrTemplate()));
1933:
1934: logRequestCompletionAndNotifyScheduler(request,
1935: isAbortOrRollback, failureInfo, -1);
1936: iter.remove();
1937: }
1938: }
1939: }
1940: }
1941: }
|