0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Contact: sequoia@continuent.org
0006: *
0007: * Licensed under the Apache License, Version 2.0 (the "License");
0008: * you may not use this file except in compliance with the License.
0009: * You may obtain a copy of the License at
0010: *
0011: * http://www.apache.org/licenses/LICENSE-2.0
0012: *
0013: * Unless required by applicable law or agreed to in writing, software
0014: * distributed under the License is distributed on an "AS IS" BASIS,
0015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0016: * See the License for the specific language governing permissions and
0017: * limitations under the License.
0018: *
0019: * Initial developer(s): Emmanuel Cecchet.
0020: * Contributor(s): Jean-Bernard van Zuylen, Damian Arregui.
0021: */package org.continuent.sequoia.controller.requestmanager.distributed;
0022:
0023: import java.io.Serializable;
0024: import java.sql.SQLException;
0025: import java.util.ArrayList;
0026: import java.util.Iterator;
0027: import java.util.List;
0028:
0029: import javax.management.NotCompliantMBeanException;
0030:
0031: import org.continuent.hedera.adapters.MulticastRequestAdapter;
0032: import org.continuent.hedera.adapters.MulticastResponse;
0033: import org.continuent.hedera.common.Member;
0034: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0035: import org.continuent.sequoia.common.exceptions.NoResultAvailableException;
0036: import org.continuent.sequoia.common.i18n.Translate;
0037: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0038: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0039: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0040: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0041: import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
0042: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0043: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0044: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0045: import org.continuent.sequoia.controller.requests.AbstractRequest;
0046: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0047: import org.continuent.sequoia.controller.requests.SelectRequest;
0048: import org.continuent.sequoia.controller.requests.StoredProcedure;
0049: import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
0050: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0051: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0052: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
0053: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedAbort;
0054: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCallableStatementExecute;
0055: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCallableStatementExecuteQuery;
0056: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCallableStatementExecuteUpdate;
0057: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0058: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0059: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest;
0060: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0061: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0062: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0063: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecute;
0064: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecuteQuery;
0065: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecuteUpdate;
0066: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecuteUpdateWithKeys;
0067: import org.continuent.sequoia.controller.virtualdatabase.protocol.ExecRemoteStatementExecuteQuery;
0068: import org.continuent.sequoia.controller.virtualdatabase.protocol.MessageTimeouts;
0069:
0070: /**
0071: * This class defines a RAIDb2DistributedRequestManager
0072: *
0073: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0074: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0075: * </a>
0076: * @author <a href="mailto:Damian.Arregui@emicnetworks.com">Damian Arregui</a>
0077: * @version 1.0
0078: */
0079: public class RAIDb2DistributedRequestManager extends
0080: DistributedRequestManager {
0081:
0082: /**
0083: * Creates a new <code>RAIDb2DistributedRequestManager</code> instance
0084: *
0085: * @param vdb the virtual database this request manager belongs to
0086: * @param scheduler the Request Scheduler to use
0087: * @param cache a Query Cache implementation
0088: * @param loadBalancer the Request Load Balancer to use
0089: * @param recoveryLog the Log Recovery to use
0090: * @param beginTimeout timeout in seconds for begin
0091: * @param commitTimeout timeout in seconds for commit
0092: * @param rollbackTimeout timeout in seconds for rollback
0093: * @throws SQLException if an error occurs
0094: * @throws NotCompliantMBeanException if the MBean is not JMX compliant
0095: */
0096: public RAIDb2DistributedRequestManager(
0097: DistributedVirtualDatabase vdb,
0098: AbstractScheduler scheduler, AbstractResultCache cache,
0099: AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
0100: long beginTimeout, long commitTimeout, long rollbackTimeout)
0101: throws SQLException, NotCompliantMBeanException {
0102: super (vdb, scheduler, cache, loadBalancer, recoveryLog,
0103: beginTimeout, commitTimeout, rollbackTimeout);
0104: }
0105:
0106: /**
0107: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
0108: */
0109: public ControllerResultSet distributedStatementExecuteQuery(
0110: SelectRequest request) throws SQLException {
0111: return executeRequestReturningResultSet(request,
0112: new DistributedStatementExecuteQuery(request), dvdb
0113: .getMessageTimeouts()
0114: .getExecReadRequestTimeout());
0115: }
0116:
0117: /**
0118: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#execRemoteStatementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
0119: */
0120: public ControllerResultSet execRemoteStatementExecuteQuery(
0121: SelectRequest request) throws SQLException {
0122: if (dvdb.isProcessMacroBeforeBroadcast())
0123: loadBalancer.handleMacros(request);
0124: try {
0125: // Iterate over controllers in members list (but us) until someone
0126: // successfully executes our request.
0127: // TODO: We could improve by jumping the controller we know in advance
0128: // that they don't have the necessary tables.
0129: Iterator i = dvdb.getAllMemberButUs().iterator();
0130: SQLException error = null;
0131: while (i.hasNext()) {
0132: Member controller = (Member) i.next();
0133: List groupMembers = new ArrayList();
0134: groupMembers.add(controller);
0135:
0136: if (logger.isDebugEnabled())
0137: logger
0138: .debug("Sending request "
0139: + request.getSqlShortForm(dvdb
0140: .getSqlShortFormLength())
0141: + (request.isAutoCommit() ? ""
0142: : " transaction "
0143: + request
0144: .getTransactionId())
0145: + " to " + controller);
0146:
0147: // Send query to remote controller
0148: MulticastResponse responses;
0149: responses = dvdb.getMulticastRequestAdapter()
0150: .multicastMessage(
0151: groupMembers,
0152: new ExecRemoteStatementExecuteQuery(
0153: request),
0154: MulticastRequestAdapter.WAIT_ALL,
0155: MessageTimeouts.getMinTimeout(dvdb
0156: .getMessageTimeouts()
0157: .getExecReadRequestTimeout(),
0158: request.getTimeout() * 1000L));
0159:
0160: if (logger.isDebugEnabled())
0161: logger.debug("Request "
0162: + request.getSqlShortForm(dvdb
0163: .getSqlShortFormLength())
0164: + " completed.");
0165:
0166: Object ret = responses.getResult(controller);
0167: if (ret instanceof ControllerResultSet)
0168: return (ControllerResultSet) ret;
0169: else if (ret instanceof SQLException)
0170: error = (SQLException) ret;
0171: }
0172:
0173: if (error == null) {
0174: // No one answered, throw
0175: throw new NoMoreBackendException();
0176: } else
0177: throw error;
0178: } catch (Exception e) {
0179: String msg = "An error occured while executing remote select request "
0180: + request.getId();
0181: logger.warn(msg, e);
0182: if (e instanceof SQLException)
0183: throw (SQLException) e;
0184: else
0185: throw new SQLException(msg + " (" + e + ")");
0186: }
0187: }
0188:
0189: /**
0190: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecuteUpdate(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
0191: */
0192: public ExecuteUpdateResult distributedStatementExecuteUpdate(
0193: AbstractWriteRequest request) throws SQLException {
0194: return executeRequestReturningInt(request,
0195: new DistributedStatementExecuteUpdate(request), dvdb
0196: .getMessageTimeouts()
0197: .getExecWriteRequestTimeout());
0198: }
0199:
0200: /**
0201: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecuteUpdateWithKeys(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
0202: */
0203: public GeneratedKeysResult distributedStatementExecuteUpdateWithKeys(
0204: AbstractWriteRequest request) throws SQLException {
0205: return executeRequestReturningGeneratedKeys(request,
0206: new DistributedStatementExecuteUpdateWithKeys(request),
0207: dvdb.getMessageTimeouts()
0208: .getExecWriteRequestWithKeysTimeout());
0209: }
0210:
0211: /**
0212: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecute(org.continuent.sequoia.controller.requests.AbstractRequest)
0213: */
0214: public ExecuteResult distributedStatementExecute(
0215: AbstractRequest request) throws SQLException {
0216: return executeRequestReturningExecuteResult(request,
0217: new DistributedStatementExecute(request), dvdb
0218: .getMessageTimeouts()
0219: .getExecReadStoredProcedureTimeout());
0220: }
0221:
0222: /**
0223: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCallableStatementExecuteQuery(StoredProcedure)
0224: */
0225: public ControllerResultSet distributedCallableStatementExecuteQuery(
0226: StoredProcedure proc) throws SQLException {
0227: StoredProcedureCallResult result = executeRequestReturningStoredProcedureCallResult(
0228: proc,
0229: new DistributedCallableStatementExecuteQuery(proc),
0230: dvdb.getMessageTimeouts()
0231: .getExecReadStoredProcedureTimeout());
0232: return (ControllerResultSet) result.getResult();
0233: }
0234:
0235: /**
0236: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCallableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0237: */
0238: public ExecuteUpdateResult distributedCallableStatementExecuteUpdate(
0239: StoredProcedure proc) throws SQLException {
0240: StoredProcedureCallResult result = executeRequestReturningStoredProcedureCallResult(
0241: proc, new DistributedCallableStatementExecuteUpdate(
0242: proc), dvdb.getMessageTimeouts()
0243: .getExecWriteStoredProcedureTimeout());
0244: return ((ExecuteUpdateResult) result.getResult());
0245: }
0246:
0247: /**
0248: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCallableStatementExecute(StoredProcedure)
0249: */
0250: public ExecuteResult distributedCallableStatementExecute(
0251: StoredProcedure proc) throws SQLException {
0252: StoredProcedureCallResult result = executeRequestReturningStoredProcedureCallResult(
0253: proc, new DistributedCallableStatementExecute(proc),
0254: dvdb.getMessageTimeouts()
0255: .getExecReadStoredProcedureTimeout());
0256: return (ExecuteResult) result.getResult();
0257: }
0258:
0259: /**
0260: * Execute a distributed request that returns a GeneratedKeysResult.
0261: *
0262: * @param request the request to execute
0263: * @param requestMsg message to be sent through the group communication
0264: * @param messageTimeout group message timeout
0265: * @return the corresponding ExecuteResult
0266: * @throws SQLException if an error occurs
0267: */
0268: private ExecuteResult executeRequestReturningExecuteResult(
0269: AbstractRequest request, DistributedRequest requestMsg,
0270: long messageTimeout) throws SQLException {
0271: if (dvdb.isProcessMacroBeforeBroadcast())
0272: loadBalancer.handleMacros(request);
0273: try {
0274: ExecuteResult requestResult = null;
0275:
0276: List groupMembers = dvdb.getAllMembers();
0277:
0278: MulticastResponse responses = multicastRequest(request,
0279: requestMsg, messageTimeout, groupMembers);
0280:
0281: // List of controllers that gave a AllBackendsFailedException
0282: ArrayList failedOnAllBackends = null;
0283: // List of controllers that have no more backends to execute queries
0284: ArrayList controllersWithoutBackends = null;
0285: SQLException exception = null;
0286: int size = groupMembers.size();
0287: List successfulControllers = null;
0288: // Get the result of each controller
0289: for (int i = 0; i < size; i++) {
0290: Member member = (Member) groupMembers.get(i);
0291: if ((responses.getFailedMembers() != null)
0292: && responses.getFailedMembers()
0293: .contains(member)) {
0294: logger.warn("Controller " + member
0295: + " is suspected of failure.");
0296: continue;
0297: }
0298: Object r = responses.getResult(member);
0299: if (r instanceof ExecuteResult) {
0300: if (requestResult == null)
0301: requestResult = (ExecuteResult) r;
0302: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0303: .equals(r)) {
0304: // Remote controller success
0305: if (requestResult == null) {
0306: if (successfulControllers == null)
0307: successfulControllers = new ArrayList();
0308: successfulControllers.add(member);
0309: }
0310: } else if (r instanceof NoMoreBackendException) {
0311: if (controllersWithoutBackends == null)
0312: controllersWithoutBackends = new ArrayList();
0313: controllersWithoutBackends.add(member);
0314: if (logger.isDebugEnabled())
0315: logger
0316: .debug("Controller "
0317: + member
0318: + " has no more backends to execute query ("
0319: + r + ")");
0320: } else if (r instanceof Exception) {
0321: if (failedOnAllBackends == null)
0322: failedOnAllBackends = new ArrayList();
0323: failedOnAllBackends.add(member);
0324: String msg = "Request " + request.getId()
0325: + " failed on controller " + member + " ("
0326: + r + ")";
0327: logger.warn(msg);
0328: if (r instanceof SQLException)
0329: exception = (SQLException) r;
0330: else {
0331: exception = new SQLException(
0332: "Internal exception " + r);
0333: exception.initCause((Throwable) r);
0334: }
0335: } else {
0336: if (failedOnAllBackends == null)
0337: failedOnAllBackends = new ArrayList();
0338: failedOnAllBackends.add(member);
0339: if (logger.isWarnEnabled())
0340: logger
0341: .warn("Unexpected answer from controller "
0342: + member
0343: + " ("
0344: + r
0345: + ") for request "
0346: + request
0347: .getSqlShortForm(vdb
0348: .getSqlShortFormLength()));
0349: }
0350: }
0351:
0352: if ((requestResult == null)
0353: && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0354: // remote controller
0355: try {
0356: requestResult = (ExecuteResult) getRequestResultFromFailoverCache(
0357: successfulControllers, request.getId());
0358: } catch (NoResultAvailableException e) {
0359: exception = new SQLException(
0360: "Request '"
0361: + request
0362: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0363: }
0364: }
0365:
0366: if ((controllersWithoutBackends != null)
0367: || (failedOnAllBackends != null)) { // Notify all controllers of completion
0368: // Re-create fake stored procedure for proper scheduler notification
0369: // (see DistributedStatementExecute#scheduleRequest)
0370: StoredProcedure proc = new StoredProcedure(request
0371: .getSqlOrTemplate(), request
0372: .getEscapeProcessing(), request.getTimeout(),
0373: request.getLineSeparator());
0374: proc.setIsAutoCommit(request.isAutoCommit());
0375: proc.setTransactionId(request.getTransactionId());
0376: proc.setTransactionIsolation(request
0377: .getTransactionIsolation());
0378: proc.setId(request.getId());
0379:
0380: /*
0381: * Notify all controllers where all backend failed (if any) that
0382: * completion was 'success'. We determine success from the successful
0383: * controllers even though they may have failed afterwards (in which
0384: * case requestResult may be null).
0385: */
0386: boolean success = (requestResult != null || successfulControllers != null);
0387: notifyRequestCompletion(proc, success, true,
0388: failedOnAllBackends);
0389:
0390: // Notify controllers without active backends (if any)
0391: notifyRequestCompletion(proc, success, false,
0392: controllersWithoutBackends);
0393: }
0394:
0395: if (requestResult != null) {
0396: if (logger.isDebugEnabled())
0397: logger.debug("Request " + request.getId()
0398: + " completed successfully.");
0399: return requestResult;
0400: }
0401:
0402: // At this point, all controllers failed
0403:
0404: if (exception != null)
0405: throw exception;
0406: else {
0407: String msg = "Request '"
0408: + request.getSqlShortForm(vdb
0409: .getSqlShortFormLength())
0410: + "' failed on all controllers";
0411: logger.warn(msg);
0412: throw new SQLException(msg);
0413: }
0414: } catch (SQLException e) {
0415: String msg = Translate.get("loadbalancer.request.failed",
0416: new String[] {
0417: request.getSqlShortForm(vdb
0418: .getSqlShortFormLength()),
0419: e.getMessage() });
0420: logger.warn(msg);
0421: throw e;
0422: }
0423: }
0424:
0425: /**
0426: * Execute a distributed request that returns a GeneratedKeysResult.
0427: *
0428: * @param request the request to execute
0429: * @param requestMsg message to be sent through the group communication
0430: * @param messageTimeout group message timeout
0431: * @return the corresponding GeneratedKeysResult
0432: * @throws SQLException if an error occurs
0433: */
0434: private GeneratedKeysResult executeRequestReturningGeneratedKeys(
0435: AbstractRequest request, DistributedRequest requestMsg,
0436: long messageTimeout) throws SQLException {
0437: if (dvdb.isProcessMacroBeforeBroadcast())
0438: loadBalancer.handleMacros(request);
0439: try {
0440: GeneratedKeysResult requestResult = null;
0441:
0442: List groupMembers = dvdb.getAllMembers();
0443:
0444: MulticastResponse responses = multicastRequest(request,
0445: requestMsg, messageTimeout, groupMembers);
0446:
0447: // List of controllers that gave a AllBackendsFailedException
0448: ArrayList failedOnAllBackends = null;
0449: // List of controllers that gave an inconsistent result
0450: ArrayList inconsistentControllers = null;
0451: // List of controllers that have no more backends to execute queries
0452: ArrayList controllersWithoutBackends = null;
0453: SQLException exception = null;
0454: List successfulControllers = null;
0455: int size = groupMembers.size();
0456: // Get the result of each controller
0457: for (int i = 0; i < size; i++) {
0458: Member member = (Member) groupMembers.get(i);
0459: if ((responses.getFailedMembers() != null)
0460: && responses.getFailedMembers()
0461: .contains(member)) {
0462: logger.warn("Controller " + member
0463: + " is suspected of failure.");
0464: continue;
0465: }
0466: Object r = responses.getResult(member);
0467: if (r instanceof GeneratedKeysResult) {
0468: if (requestResult == null) {
0469: requestResult = (GeneratedKeysResult) r;
0470: } else {
0471: if (requestResult.getUpdateCount() != ((GeneratedKeysResult) r)
0472: .getUpdateCount()) {
0473: if (inconsistentControllers == null)
0474: inconsistentControllers = new ArrayList();
0475: inconsistentControllers.add(member);
0476: logger
0477: .error("Controller "
0478: + member
0479: + " returned an inconsistent results ("
0480: + requestResult
0481: + " when expecting " + r
0482: + ") for request "
0483: + request.getId());
0484: }
0485: }
0486: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0487: .equals(r)) {
0488: // Remote controller success
0489: if (requestResult == null) {
0490: if (successfulControllers == null)
0491: successfulControllers = new ArrayList();
0492: successfulControllers.add(member);
0493: }
0494: } else if (r instanceof NoMoreBackendException) {
0495: if (controllersWithoutBackends == null)
0496: controllersWithoutBackends = new ArrayList();
0497: controllersWithoutBackends.add(member);
0498: if (logger.isDebugEnabled())
0499: logger
0500: .debug("Controller "
0501: + member
0502: + " has no more backends to execute query ("
0503: + r + ")");
0504: } else if (r instanceof Exception) {
0505: if (failedOnAllBackends == null)
0506: failedOnAllBackends = new ArrayList();
0507: failedOnAllBackends.add(member);
0508: String msg = "Request " + request.getId()
0509: + " failed on controller " + member + " ("
0510: + r + ")";
0511: logger.warn(msg);
0512: if (r instanceof SQLException)
0513: exception = (SQLException) r;
0514: else {
0515: exception = new SQLException(
0516: "Internal exception " + r);
0517: exception.initCause((Throwable) r);
0518: }
0519: } else {
0520: if (failedOnAllBackends == null)
0521: failedOnAllBackends = new ArrayList();
0522: failedOnAllBackends.add(member);
0523: if (logger.isWarnEnabled())
0524: logger
0525: .warn("Unexpected answer from controller "
0526: + member
0527: + " ("
0528: + r
0529: + ") for request "
0530: + request
0531: .getSqlShortForm(vdb
0532: .getSqlShortFormLength()));
0533: }
0534: }
0535:
0536: if ((requestResult == null)
0537: && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0538: // remote controller
0539: try {
0540: requestResult = (GeneratedKeysResult) getRequestResultFromFailoverCache(
0541: successfulControllers, request.getId());
0542: } catch (NoResultAvailableException e) {
0543: exception = new SQLException(
0544: "Request '"
0545: + request
0546: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0547: }
0548: }
0549:
0550: /*
0551: * Notify all controllers where all backend failed (if any) that
0552: * completion was 'success'. We determine success from the successful
0553: * controllers even though they may have failed afterwards (in which case
0554: * requestResult may be null).
0555: */
0556: boolean success = (requestResult != null || successfulControllers != null);
0557: notifyRequestCompletion(request, success, true,
0558: failedOnAllBackends);
0559:
0560: // Notify controllers without active backends (if any)
0561: notifyRequestCompletion(request, success, false,
0562: controllersWithoutBackends);
0563:
0564: if (inconsistentControllers != null) { // Notify all inconsistent controllers to disable their backends
0565: notifyControllerInconsistency(request,
0566: inconsistentControllers);
0567: }
0568:
0569: if (requestResult != null) {
0570: if (logger.isDebugEnabled())
0571: logger.debug("Request " + request.getId()
0572: + " completed successfully.");
0573: return requestResult;
0574: }
0575:
0576: // At this point, all controllers failed
0577:
0578: if (exception != null)
0579: throw exception;
0580: else {
0581: String msg = "Request '" + request
0582: + "' failed on all controllers";
0583: logger.warn(msg);
0584: throw new SQLException(msg);
0585: }
0586: } catch (SQLException e) {
0587: String msg = Translate.get("loadbalancer.request.failed",
0588: new String[] {
0589: request.getSqlShortForm(vdb
0590: .getSqlShortFormLength()),
0591: e.getMessage() });
0592: logger.warn(msg);
0593: throw e;
0594: }
0595: }
0596:
0597: /**
0598: * Execute a distributed request that returns a ResultSet.
0599: *
0600: * @param request the request to execute
0601: * @param requestMsg message to be sent through the group communication
0602: * @param messageTimeout group message timeout
0603: * @return the corresponding ResultSet
0604: * @throws SQLException if an error occurs
0605: */
0606: private ExecuteUpdateResult executeRequestReturningInt(
0607: AbstractRequest request, DistributedRequest requestMsg,
0608: long messageTimeout) throws SQLException {
0609: if (dvdb.isProcessMacroBeforeBroadcast())
0610: loadBalancer.handleMacros(request);
0611: try {
0612: ExecuteUpdateResult requestResult = null;
0613:
0614: List groupMembers = dvdb.getAllMembers();
0615:
0616: MulticastResponse responses = multicastRequest(request,
0617: requestMsg, messageTimeout, groupMembers);
0618:
0619: // List of controllers that gave a AllBackendsFailedException
0620: ArrayList failedOnAllBackends = null;
0621: // List of controllers that gave an inconsistent result
0622: ArrayList inconsistentControllers = null;
0623: // List of controllers that have no more backends to execute queries
0624: ArrayList controllersWithoutBackends = null;
0625: SQLException exception = null;
0626: int size = groupMembers.size();
0627: // Get the result of each controller
0628: for (int i = 0; i < size; i++) {
0629: Member member = (Member) groupMembers.get(i);
0630: if ((responses.getFailedMembers() != null)
0631: && responses.getFailedMembers()
0632: .contains(member)) {
0633: logger.warn("Controller " + member
0634: + " is suspected of failure.");
0635: continue;
0636: }
0637: Object r = responses.getResult(member);
0638: if (r instanceof ExecuteUpdateResult) {
0639: if (requestResult == null)
0640: requestResult = (ExecuteUpdateResult) r;
0641: else {
0642: if (requestResult.getUpdateCount() != ((ExecuteUpdateResult) r)
0643: .getUpdateCount()) {
0644: if (inconsistentControllers == null)
0645: inconsistentControllers = new ArrayList();
0646: inconsistentControllers.add(member);
0647: logger
0648: .error("Controller "
0649: + member
0650: + " returned an inconsistent results ("
0651: + requestResult
0652: .getUpdateCount()
0653: + " when expecting "
0654: + ((ExecuteUpdateResult) r)
0655: .getUpdateCount()
0656: + ") for request "
0657: + request.getId());
0658: }
0659: }
0660: } else if (r instanceof NoMoreBackendException) {
0661: if (controllersWithoutBackends == null)
0662: controllersWithoutBackends = new ArrayList();
0663: controllersWithoutBackends.add(member);
0664: if (logger.isDebugEnabled())
0665: logger
0666: .debug("Controller "
0667: + member
0668: + " has no more backends to execute query ("
0669: + r + ")");
0670: } else if (r instanceof Exception) {
0671: if (failedOnAllBackends == null)
0672: failedOnAllBackends = new ArrayList();
0673: failedOnAllBackends.add(member);
0674: String msg = "Request " + request.getId()
0675: + " failed on controller " + member + " ("
0676: + r + ")";
0677: logger.warn(msg);
0678: if (r instanceof SQLException)
0679: exception = (SQLException) r;
0680: else {
0681: exception = new SQLException(
0682: "Internal exception " + r);
0683: exception.initCause((Throwable) r);
0684: }
0685: } else {
0686: if (failedOnAllBackends == null)
0687: failedOnAllBackends = new ArrayList();
0688: failedOnAllBackends.add(member);
0689: if (logger.isWarnEnabled())
0690: logger
0691: .warn("Unexpected answer from controller "
0692: + member
0693: + " ("
0694: + r
0695: + ") for request "
0696: + request
0697: .getSqlShortForm(vdb
0698: .getSqlShortFormLength()));
0699: }
0700: }
0701:
0702: // Notify all controllers where all backends failed (if any) that
0703: // completion was 'success'
0704: notifyRequestCompletion(request, requestResult != null,
0705: true, failedOnAllBackends);
0706:
0707: // Notify controllers without active backends (if any)
0708: notifyRequestCompletion(request, requestResult != null,
0709: false, controllersWithoutBackends);
0710:
0711: if (inconsistentControllers != null) { // Notify all inconsistent controllers to disable their backends
0712: notifyControllerInconsistency(request,
0713: inconsistentControllers);
0714: }
0715:
0716: if (requestResult != null) {
0717: if (logger.isDebugEnabled())
0718: logger.debug("Request " + request.getId()
0719: + " completed successfully.");
0720: return requestResult;
0721: }
0722:
0723: // At this point, all controllers failed
0724:
0725: if (exception != null)
0726: throw exception;
0727: else {
0728: String msg = "Request '" + request
0729: + "' failed on all controllers";
0730: logger.warn(msg);
0731: throw new SQLException(msg);
0732: }
0733: } catch (SQLException e) {
0734: String msg = Translate.get("loadbalancer.request.failed",
0735: new String[] {
0736: request.getSqlShortForm(vdb
0737: .getSqlShortFormLength()),
0738: e.getMessage() });
0739: logger.warn(msg);
0740: throw e;
0741: }
0742: }
0743:
0744: /**
0745: * Execute a distributed request that returns a ResultSet.
0746: *
0747: * @param request the request to execute
0748: * @param requestMsg message to be sent through the group communication
0749: * @param messageTimeout group message timeout
0750: * @return the corresponding ResultSet
0751: * @throws SQLException if an error occurs
0752: */
0753: private ControllerResultSet executeRequestReturningResultSet(
0754: AbstractRequest request, DistributedRequest requestMsg,
0755: long messageTimeout) throws SQLException {
0756: if (dvdb.isProcessMacroBeforeBroadcast())
0757: loadBalancer.handleMacros(request);
0758: try {
0759: ControllerResultSet requestResult = null;
0760:
0761: List groupMembers = dvdb.getAllMembers();
0762:
0763: MulticastResponse responses = multicastRequest(request,
0764: requestMsg, messageTimeout, groupMembers);
0765:
0766: // List of controllers that gave a AllBackendsFailedException
0767: ArrayList failedOnAllBackends = null;
0768: // List of controllers that have no more backends to execute queries
0769: ArrayList controllersWithoutBackends = null;
0770: SQLException exception = null;
0771: int size = groupMembers.size();
0772: List successfulControllers = null;
0773: // Get the result of each controller
0774: for (int i = 0; i < size; i++) {
0775: Member member = (Member) groupMembers.get(i);
0776: if ((responses.getFailedMembers() != null)
0777: && responses.getFailedMembers()
0778: .contains(member)) {
0779: logger.warn("Controller " + member
0780: + " is suspected of failure.");
0781: continue;
0782: }
0783: Object r = responses.getResult(member);
0784: if (r instanceof ControllerResultSet) {
0785: if (requestResult == null)
0786: requestResult = (ControllerResultSet) r;
0787: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0788: .equals(r)) {
0789: // Remote controller success
0790: if (requestResult == null) {
0791: if (successfulControllers == null)
0792: successfulControllers = new ArrayList();
0793: successfulControllers.add(member);
0794: }
0795: } else if (r instanceof NoMoreBackendException) {
0796: if (controllersWithoutBackends == null)
0797: controllersWithoutBackends = new ArrayList();
0798: controllersWithoutBackends.add(member);
0799: if (logger.isDebugEnabled())
0800: logger
0801: .debug("Controller "
0802: + member
0803: + " has no more backends to execute query ("
0804: + r + ")");
0805: } else if (r instanceof Exception) {
0806: if (failedOnAllBackends == null)
0807: failedOnAllBackends = new ArrayList();
0808: failedOnAllBackends.add(member);
0809: String msg = "Request " + request.getId()
0810: + " failed on controller " + member + " ("
0811: + r + ")";
0812: logger.warn(msg);
0813: if (r instanceof SQLException)
0814: exception = (SQLException) r;
0815: else {
0816: exception = new SQLException(
0817: "Internal exception " + r);
0818: exception.initCause((Throwable) r);
0819: }
0820: } else {
0821: if (failedOnAllBackends == null)
0822: failedOnAllBackends = new ArrayList();
0823: failedOnAllBackends.add(member);
0824: if (logger.isWarnEnabled())
0825: logger
0826: .warn("Unexpected answer from controller "
0827: + member
0828: + " ("
0829: + r
0830: + ") for request "
0831: + request
0832: .getSqlShortForm(vdb
0833: .getSqlShortFormLength()));
0834: }
0835: }
0836:
0837: if ((requestResult == null)
0838: && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0839: // remote controller
0840: try {
0841: requestResult = (ControllerResultSet) getRequestResultFromFailoverCache(
0842: successfulControllers, request.getId());
0843: } catch (NoResultAvailableException e) {
0844: exception = new SQLException(
0845: "Request '"
0846: + request
0847: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0848: }
0849: }
0850:
0851: /*
0852: * Notify all controllers where all backend failed (if any) that
0853: * completion was 'success'. We determine success from the successful
0854: * controllers even though they may have failed afterwards (in which case
0855: * requestResult may be null).
0856: */
0857: boolean success = (requestResult != null || successfulControllers != null);
0858: notifyRequestCompletion(request, success, true,
0859: failedOnAllBackends);
0860:
0861: // Notify controllers without active backends (if any)
0862: notifyRequestCompletion(request, success, false,
0863: controllersWithoutBackends);
0864:
0865: if (requestResult != null) {
0866: if (logger.isDebugEnabled())
0867: logger.debug("Request " + request.getId()
0868: + " completed successfully.");
0869: return requestResult;
0870: }
0871:
0872: // At this point, all controllers failed
0873:
0874: if (exception != null)
0875: throw exception;
0876: else {
0877: String msg = "Request '" + request
0878: + "' failed on all controllers";
0879: logger.warn(msg);
0880: throw new SQLException(msg);
0881: }
0882: } catch (SQLException e) {
0883: String msg = Translate.get("loadbalancer.request.failed",
0884: new String[] {
0885: request.getSqlShortForm(vdb
0886: .getSqlShortFormLength()),
0887: e.getMessage() });
0888: logger.warn(msg);
0889: throw e;
0890: }
0891: }
0892:
0893: /**
0894: * Execute a distributed request (usually a stored procedure) that returns a
0895: * StoredProcedureCallResult.
0896: *
0897: * @param proc the stored procedure to execute
0898: * @param requestMsg message to be sent through the group communication
0899: * @param messageTimeout group message timeout
0900: * @return the corresponding StoredProcedureCallResult
0901: * @throws SQLException if an error occurs
0902: */
0903: private StoredProcedureCallResult executeRequestReturningStoredProcedureCallResult(
0904: StoredProcedure proc, DistributedRequest requestMsg,
0905: long messageTimeout) throws SQLException {
0906: if (dvdb.isProcessMacroBeforeBroadcast())
0907: loadBalancer.handleMacros(proc);
0908: try {
0909: StoredProcedureCallResult result = null;
0910:
0911: List groupMembers = dvdb.getAllMembers();
0912:
0913: MulticastResponse responses = multicastRequest(proc,
0914: requestMsg, messageTimeout, groupMembers);
0915:
0916: // List of controllers that gave a AllBackendsFailedException
0917: ArrayList failedOnAllBackends = null;
0918: // List of controllers that have no more backends to execute queries
0919: ArrayList controllersWithoutBackends = null;
0920: SQLException exception = null;
0921: int size = groupMembers.size();
0922: List successfulControllers = null;
0923: // Get the result of each controller
0924: for (int i = 0; i < size; i++) {
0925: Member member = (Member) groupMembers.get(i);
0926: if ((responses.getFailedMembers() != null)
0927: && responses.getFailedMembers()
0928: .contains(member)) {
0929: logger.warn("Controller " + member
0930: + " is suspected of failure.");
0931: continue;
0932: }
0933: Object r = responses.getResult(member);
0934: if (r instanceof StoredProcedureCallResult) {
0935: if (result == null)
0936: result = (StoredProcedureCallResult) r;
0937: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0938: .equals(r)) {
0939: // Remote controller success
0940: if (result == null) {
0941: if (successfulControllers == null)
0942: successfulControllers = new ArrayList();
0943: successfulControllers.add(member);
0944: }
0945: } else if (r instanceof NoMoreBackendException) {
0946: if (controllersWithoutBackends == null)
0947: controllersWithoutBackends = new ArrayList();
0948: controllersWithoutBackends.add(member);
0949: if (logger.isDebugEnabled())
0950: logger
0951: .debug("Controller "
0952: + member
0953: + " has no more backends to execute query ("
0954: + r + ")");
0955: } else if (r instanceof Exception) {
0956: if (failedOnAllBackends == null)
0957: failedOnAllBackends = new ArrayList();
0958: failedOnAllBackends.add(member);
0959: String msg = "Request " + proc.getId()
0960: + " failed on controller " + member + " ("
0961: + r + ")";
0962: logger.warn(msg);
0963: if (r instanceof SQLException)
0964: exception = (SQLException) r;
0965: else {
0966: exception = new SQLException(
0967: "Internal exception " + r);
0968: exception.initCause((Throwable) r);
0969: }
0970: } else {
0971: if (failedOnAllBackends == null)
0972: failedOnAllBackends = new ArrayList();
0973: failedOnAllBackends.add(member);
0974: if (logger.isWarnEnabled())
0975: logger
0976: .warn("Unexpected answer from controller "
0977: + member
0978: + " ("
0979: + r
0980: + ") for request "
0981: + proc
0982: .getSqlShortForm(vdb
0983: .getSqlShortFormLength()));
0984: }
0985: }
0986:
0987: if ((result == null) && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0988: // remote controller
0989: try {
0990: result = (StoredProcedureCallResult) getRequestResultFromFailoverCache(
0991: successfulControllers, proc.getId());
0992: } catch (NoResultAvailableException e) {
0993: exception = new SQLException(
0994: "Request '"
0995: + proc
0996: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0997: }
0998: }
0999:
1000: /*
1001: * Notify all controllers where all backend failed (if any) that
1002: * completion was 'success'. We determine success from the successful
1003: * controllers even though they may have failed afterwards (in which case
1004: * requestResult may be null).
1005: */
1006: boolean success = (result != null || successfulControllers != null);
1007: notifyRequestCompletion(proc, success, true,
1008: failedOnAllBackends);
1009:
1010: // Notify controllers without active backends (if any)
1011: notifyRequestCompletion(proc, success, false,
1012: controllersWithoutBackends);
1013:
1014: if (result != null) {
1015: proc.copyNamedAndOutParameters(result
1016: .getStoredProcedure());
1017: if (logger.isDebugEnabled())
1018: logger.debug("Request " + proc.getId()
1019: + " completed successfully.");
1020: return result;
1021: }
1022:
1023: // At this point, all controllers failed
1024:
1025: if (exception != null)
1026: throw exception;
1027: else {
1028: String msg = "Request '" + proc
1029: + "' failed on all controllers";
1030: logger.warn(msg);
1031: throw new SQLException(msg);
1032: }
1033: } catch (SQLException e) {
1034: String msg = Translate.get("loadbalancer.request.failed",
1035: new String[] {
1036: proc.getSqlShortForm(vdb
1037: .getSqlShortFormLength()),
1038: e.getMessage() });
1039: logger.warn(msg);
1040: throw e;
1041: }
1042: }
1043:
1044: /**
1045: * Multicast a request to a set of controllers.
1046: *
1047: * @param request the request that is executed
1048: * @param requestMsg the group message to send
1049: * @param messageTimeout timeout on the group message
1050: * @param groupMembers list of controllers to send the message to
1051: * @return the responses to the multicast
1052: * @throws SQLException if an error occurs
1053: */
1054: private MulticastResponse multicastRequest(AbstractRequest request,
1055: Serializable requestMsg, long messageTimeout,
1056: List groupMembers) throws SQLException {
1057: if (logger.isDebugEnabled())
1058: logger.debug("Broadcasting request "
1059: + request.getSqlShortForm(dvdb
1060: .getSqlShortFormLength())
1061: + (request.isAutoCommit() ? "" : " transaction "
1062: + request.getTransactionId())
1063: + " to all controllers ("
1064: + dvdb.getChannel().getLocalMembership() + "->"
1065: + groupMembers.toString() + ")");
1066:
1067: // Send the query to everybody including us
1068: MulticastResponse responses;
1069: try {
1070: // Warning! Message timeouts are in ms but request timeout is in seconds
1071: responses = dvdb.getMulticastRequestAdapter()
1072: .multicastMessage(
1073: groupMembers,
1074: requestMsg,
1075: MulticastRequestAdapter.WAIT_ALL,
1076: MessageTimeouts.getMinTimeout(
1077: messageTimeout, request
1078: .getTimeout() * 1000L));
1079: } catch (Exception e) {
1080: String msg = "An error occured while executing distributed request "
1081: + request.getId();
1082: logger.warn(msg, e);
1083: throw new SQLException(msg + " (" + e + ")");
1084: }
1085:
1086: if (logger.isDebugEnabled())
1087: logger.debug("Request "
1088: + request.getSqlShortForm(dvdb
1089: .getSqlShortFormLength()) + " completed.");
1090:
1091: if (responses.getFailedMembers() != null) { // Some controllers failed ... too bad !
1092: logger
1093: .warn(responses.getFailedMembers().size()
1094: + " controller(s) died during execution of request "
1095: + request.getId());
1096: }
1097: return responses;
1098: }
1099:
1100: /**
1101: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedAbort(String,
1102: * long)
1103: */
1104: public void distributedAbort(String login, long transactionId)
1105: throws SQLException {
1106: try {
1107: List groupMembers = dvdb.getAllMembers();
1108:
1109: AbstractRequest abortRequest = new UnknownWriteRequest(
1110: "abort", false, 0, "\n");
1111: abortRequest.setTransactionId(transactionId);
1112:
1113: MulticastResponse responses = multicastRequest(
1114: abortRequest, new DistributedAbort(login,
1115: transactionId), dvdb.getMessageTimeouts()
1116: .getRollbackTimeout(), groupMembers);
1117:
1118: if (logger.isDebugEnabled())
1119: logger.debug("Abort of transaction " + transactionId
1120: + " completed.");
1121:
1122: // List of controllers that have no more backends to execute queries
1123: ArrayList controllersWithoutBackends = null;
1124: SQLException exception = null;
1125: int size = groupMembers.size();
1126: boolean success = false;
1127: // Get the result of each controller
1128: for (int i = 0; i < size; i++) {
1129: Member member = (Member) groupMembers.get(i);
1130: if ((responses.getFailedMembers() != null)
1131: && responses.getFailedMembers()
1132: .contains(member)) {
1133: logger.warn("Controller " + member
1134: + " is suspected of failure.");
1135: continue;
1136: }
1137: Object r = responses.getResult(member);
1138: if (r instanceof Boolean) {
1139: if (((Boolean) r).booleanValue())
1140: success = true;
1141: else
1142: logger
1143: .error("Unexpected result for controller "
1144: + member);
1145: } else if (r instanceof NoMoreBackendException) {
1146: if (controllersWithoutBackends == null)
1147: controllersWithoutBackends = new ArrayList();
1148: controllersWithoutBackends.add(member);
1149: if (logger.isDebugEnabled())
1150: logger
1151: .debug("Controller "
1152: + member
1153: + " has no more backends to abort transaction "
1154: + transactionId + " (" + r
1155: + ")");
1156: }
1157: }
1158:
1159: // Notify controllers without active backends
1160: notifyRequestCompletion(abortRequest, success, false,
1161: controllersWithoutBackends);
1162:
1163: if (success)
1164: return; // This is a success if at least one controller has succeeded
1165:
1166: if (exception != null)
1167: throw exception;
1168:
1169: // At this point, all controllers failed
1170:
1171: String msg = "Transaction " + transactionId
1172: + " failed to abort on all controllers";
1173: logger.warn(msg);
1174: throw new SQLException(msg);
1175: } catch (SQLException e) {
1176: String msg = "Transaction " + transactionId
1177: + " abort failed (" + e + ")";
1178: logger.warn(msg);
1179: throw e;
1180: }
1181: }
1182:
1183: /**
1184: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCommit(String,
1185: * long)
1186: */
1187: public void distributedCommit(String login, long transactionId)
1188: throws SQLException {
1189: try {
1190: List groupMembers = dvdb.getAllMembers();
1191: AbstractRequest commitRequest = new UnknownWriteRequest(
1192: "commit", false, 0, "\n");
1193: commitRequest.setTransactionId(transactionId);
1194: commitRequest.setLogin(login);
1195: commitRequest.setIsAutoCommit(false);
1196:
1197: MulticastResponse responses = multicastRequest(
1198: commitRequest, new DistributedCommit(login,
1199: transactionId), dvdb.getMessageTimeouts()
1200: .getCommitTimeout(), groupMembers);
1201:
1202: if (logger.isDebugEnabled())
1203: logger.debug("Commit of transaction " + transactionId
1204: + " completed.");
1205:
1206: // List of controllers that gave a AllBackendsFailedException
1207: ArrayList failedOnAllBackends = null;
1208: // List of controllers that have no more backends to execute queries
1209: ArrayList controllersWithoutBackends = null;
1210: SQLException exception = null;
1211: // get a list that won't change while we go through it
1212: groupMembers = dvdb.getAllMembers();
1213: int size = groupMembers.size();
1214: boolean success = false;
1215: // Get the result of each controller
1216: for (int i = 0; i < size; i++) {
1217: Member member = (Member) groupMembers.get(i);
1218: if ((responses.getFailedMembers() != null)
1219: && responses.getFailedMembers()
1220: .contains(member)) {
1221: logger.warn("Controller " + member
1222: + " is suspected of failure.");
1223: continue;
1224: }
1225: Object r = responses.getResult(member);
1226: if (r instanceof Boolean) {
1227: if (((Boolean) r).booleanValue())
1228: success = true;
1229: else
1230: logger
1231: .error("Unexpected result for controller "
1232: + member);
1233: } else if (r instanceof NoMoreBackendException) {
1234: if (controllersWithoutBackends == null)
1235: controllersWithoutBackends = new ArrayList();
1236: controllersWithoutBackends.add(member);
1237: if (logger.isDebugEnabled())
1238: logger
1239: .debug("Controller "
1240: + member
1241: + " has no more backends to commit transaction "
1242: + transactionId + " (" + r
1243: + ")");
1244: } else if (r instanceof AllBackendsFailedException) {
1245: if (failedOnAllBackends == null)
1246: failedOnAllBackends = new ArrayList();
1247: failedOnAllBackends.add(member);
1248: if (logger.isDebugEnabled())
1249: logger
1250: .debug("Commit failed on all backends of controller "
1251: + member + " (" + r + ")");
1252: } else if (r instanceof SQLException) {
1253: if (failedOnAllBackends == null)
1254: failedOnAllBackends = new ArrayList();
1255: failedOnAllBackends.add(member);
1256: String msg = "Commit of transaction "
1257: + transactionId + " failed on controller "
1258: + member + " (" + r + ")";
1259: logger.warn(msg);
1260: exception = (SQLException) r;
1261: }
1262: }
1263:
1264: // Notify all controllers where all backend failed (if any) that
1265: // completion was 'success'
1266: notifyRequestCompletion(commitRequest, success, true,
1267: failedOnAllBackends);
1268:
1269: // Notify controllers without active backends (if any)
1270: notifyRequestCompletion(commitRequest, success, false,
1271: controllersWithoutBackends);
1272:
1273: if (success)
1274: return; // This is a success if at least one controller has succeeded
1275:
1276: // At this point, all controllers failed
1277:
1278: if (exception != null)
1279: throw exception;
1280: else {
1281: String msg = "Transaction " + transactionId
1282: + " failed to commit on all controllers";
1283: logger.warn(msg);
1284: throw new SQLException(msg);
1285: }
1286: } catch (SQLException e) {
1287: String msg = "Transaction " + transactionId
1288: + " commit failed (" + e + ")";
1289: logger.warn(msg);
1290: throw e;
1291: }
1292: }
1293:
1294: /**
1295: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedRollback(String,
1296: * long)
1297: */
1298: public void distributedRollback(String login, long transactionId)
1299: throws SQLException {
1300: try {
1301: List groupMembers = dvdb.getAllMembers();
1302:
1303: AbstractRequest rollbackRequest = new UnknownWriteRequest(
1304: "rollback", false, 0, "\n");
1305: rollbackRequest.setTransactionId(transactionId);
1306: rollbackRequest.setLogin(login);
1307: rollbackRequest.setIsAutoCommit(false);
1308:
1309: MulticastResponse responses = multicastRequest(
1310: rollbackRequest, new DistributedRollback(login,
1311: transactionId), dvdb.getMessageTimeouts()
1312: .getRollbackTimeout(), groupMembers);
1313:
1314: if (logger.isDebugEnabled())
1315: logger.debug("Rollback of transaction " + transactionId
1316: + " completed.");
1317:
1318: // List of controllers that gave a AllBackendsFailedException
1319: ArrayList failedOnAllBackends = null;
1320: // List of controllers that have no more backends to execute queries
1321: ArrayList controllersWithoutBackends = null;
1322: SQLException exception = null;
1323:
1324: // get a list that won't change while we go through it
1325: groupMembers = dvdb.getAllMembers();
1326: int size = groupMembers.size();
1327: boolean success = false;
1328: // Get the result of each controller
1329: for (int i = 0; i < size; i++) {
1330: Member member = (Member) groupMembers.get(i);
1331: if ((responses.getFailedMembers() != null)
1332: && responses.getFailedMembers()
1333: .contains(member)) {
1334: logger.warn("Controller " + member
1335: + " is suspected of failure.");
1336: continue;
1337: }
1338: Object r = responses.getResult(member);
1339: if (r instanceof Boolean) {
1340: if (((Boolean) r).booleanValue())
1341: success = true;
1342: else
1343: logger
1344: .error("Unexpected result for controller "
1345: + member);
1346: } else if (r instanceof NoMoreBackendException) {
1347: if (controllersWithoutBackends == null)
1348: controllersWithoutBackends = new ArrayList();
1349: controllersWithoutBackends.add(member);
1350: if (logger.isDebugEnabled())
1351: logger
1352: .debug("Controller "
1353: + member
1354: + " has no more backends to rollback transaction "
1355: + transactionId + " (" + r
1356: + ")");
1357: } else if (r instanceof AllBackendsFailedException) {
1358: if (failedOnAllBackends == null)
1359: failedOnAllBackends = new ArrayList();
1360: failedOnAllBackends.add(member);
1361: if (logger.isDebugEnabled())
1362: logger
1363: .debug("Rollback failed on all backends of controller "
1364: + member + " (" + r + ")");
1365: } else if (r instanceof SQLException) {
1366: if (failedOnAllBackends == null)
1367: failedOnAllBackends = new ArrayList();
1368: failedOnAllBackends.add(member);
1369: String msg = "Rollback of transaction "
1370: + transactionId + " failed on controller "
1371: + member + " (" + r + ")";
1372: logger.warn(msg);
1373: exception = (SQLException) r;
1374: }
1375: }
1376:
1377: // Notify all controllers where all backend failed (if any) that
1378: // completion was 'success'
1379: notifyRequestCompletion(rollbackRequest, success, true,
1380: failedOnAllBackends);
1381:
1382: // Notify controllers without active backends (if any)
1383: notifyRequestCompletion(rollbackRequest, success, false,
1384: controllersWithoutBackends);
1385:
1386: if (success)
1387: return; // This is a success if at least one controller has succeeded
1388:
1389: if (exception != null)
1390: throw exception;
1391:
1392: // At this point, all controllers failed
1393:
1394: String msg = "Transaction " + transactionId
1395: + " failed to rollback on all controllers";
1396: logger.warn(msg);
1397: throw new SQLException(msg);
1398: } catch (SQLException e) {
1399: String msg = "Transaction " + transactionId
1400: + " rollback failed (" + e + ")";
1401: logger.warn(msg);
1402: throw e;
1403: }
1404: }
1405:
1406: /**
1407: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedRollback(String,
1408: * long, String)
1409: */
1410: public void distributedRollback(String login, long transactionId,
1411: String savepointName) throws SQLException {
1412: try {
1413: List groupMembers = dvdb.getAllMembers();
1414:
1415: AbstractRequest rollbackRequest = new UnknownWriteRequest(
1416: "rollback " + savepointName, false, 0, "\n");
1417: rollbackRequest.setTransactionId(transactionId);
1418: rollbackRequest.setLogin(login);
1419: rollbackRequest.setIsAutoCommit(false);
1420:
1421: MulticastResponse responses = multicastRequest(
1422: rollbackRequest,
1423: new DistributedRollbackToSavepoint(transactionId,
1424: savepointName), dvdb.getMessageTimeouts()
1425: .getRollbackToSavepointTimeout(),
1426: groupMembers);
1427:
1428: if (logger.isDebugEnabled())
1429: logger.debug("Rollback to savepoint " + savepointName
1430: + " for " + "transaction " + transactionId
1431: + " completed.");
1432:
1433: // List of controllers that gave a AllBackendsFailedException
1434: ArrayList failedOnAllBackends = null;
1435: // List of controllers that have no more backends to execute queries
1436: ArrayList controllersWithoutBackends = null;
1437: SQLException exception = null;
1438:
1439: // get a list that won't change while we go through it
1440: groupMembers = dvdb.getAllMembers();
1441: int size = groupMembers.size();
1442: boolean success = false;
1443: // Get the result of each controller
1444: for (int i = 0; i < size; i++) {
1445: Member member = (Member) groupMembers.get(i);
1446: if ((responses.getFailedMembers() != null)
1447: && responses.getFailedMembers()
1448: .contains(member)) {
1449: logger.warn("Controller " + member
1450: + " is suspected of failure.");
1451: continue;
1452: }
1453: Object r = responses.getResult(member);
1454: if (r instanceof Boolean) {
1455: if (((Boolean) r).booleanValue())
1456: success = true;
1457: else
1458: logger
1459: .error("Unexpected result for controller "
1460: + member);
1461: } else if (r instanceof NoMoreBackendException) {
1462: if (controllersWithoutBackends == null)
1463: controllersWithoutBackends = new ArrayList();
1464: controllersWithoutBackends.add(member);
1465: if (logger.isDebugEnabled())
1466: logger.debug("Controller " + member
1467: + " has no more backends to "
1468: + "rollback to savepoint "
1469: + savepointName + " for "
1470: + "transaction " + transactionId + " ("
1471: + r + ")");
1472: } else if (r instanceof AllBackendsFailedException) {
1473: if (failedOnAllBackends == null)
1474: failedOnAllBackends = new ArrayList();
1475: failedOnAllBackends.add(member);
1476: if (logger.isDebugEnabled())
1477: logger
1478: .debug("rollback to savepoint failed on all backends of "
1479: + "controller "
1480: + member
1481: + " ("
1482: + r + ")");
1483: } else if (r instanceof SQLException) {
1484: if (failedOnAllBackends == null)
1485: failedOnAllBackends = new ArrayList();
1486: failedOnAllBackends.add(member);
1487: String msg = "rollback to savepoint "
1488: + savepointName + " for " + "transaction "
1489: + transactionId + " failed on controller "
1490: + member + " (" + r + ")";
1491: logger.warn(msg);
1492: exception = (SQLException) r;
1493: }
1494: }
1495:
1496: // Notify all controllers where all backend failed (if any) that
1497: // completion was 'success'
1498: notifyRequestCompletion(rollbackRequest, success, true,
1499: failedOnAllBackends);
1500:
1501: // Notify controllers without active backends (if any)
1502: notifyRequestCompletion(rollbackRequest, success, false,
1503: controllersWithoutBackends);
1504:
1505: if (success)
1506: return; // This is a success if at least one controller has succeeded
1507:
1508: if (exception != null)
1509: throw exception;
1510:
1511: // At this point, all controllers failed
1512:
1513: if (exception != null)
1514: throw exception;
1515: else {
1516: String msg = "Rollback to savepoint " + savepointName
1517: + " for " + "transaction " + transactionId
1518: + " failed on all controllers";
1519: logger.warn(msg);
1520: throw new SQLException(msg);
1521: }
1522: } catch (SQLException e) {
1523: String msg = "Rollback to savepoint " + savepointName
1524: + " for " + "transaction " + transactionId
1525: + " failed (" + e + ")";
1526: logger.warn(msg);
1527: throw e;
1528: }
1529: }
1530:
1531: /**
1532: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedSetSavepoint(String,
1533: * long, String)
1534: */
1535: public void distributedSetSavepoint(String login,
1536: long transactionId, String name) throws SQLException {
1537: try {
1538: List groupMembers = dvdb.getAllMembers();
1539:
1540: AbstractRequest setSavepointRequest = new UnknownWriteRequest(
1541: "savepoint " + name, false, 0, "\n");
1542: setSavepointRequest.setTransactionId(transactionId);
1543: setSavepointRequest.setLogin(login);
1544: setSavepointRequest.setIsAutoCommit(false);
1545:
1546: MulticastResponse responses = multicastRequest(
1547: setSavepointRequest, new DistributedSetSavepoint(
1548: login, transactionId, name), dvdb
1549: .getMessageTimeouts()
1550: .getSetSavepointTimeout(), groupMembers);
1551:
1552: if (logger.isDebugEnabled())
1553: logger.debug("Set savepoint " + name
1554: + " to transaction " + transactionId
1555: + " completed.");
1556:
1557: // List of controllers that gave a AllBackendsFailedException
1558: ArrayList failedOnAllBackends = null;
1559: // List of controllers that have no more backends to execute queries
1560: ArrayList controllersWithoutBackends = null;
1561: SQLException exception = null;
1562:
1563: // get a list that won't change while we go through it
1564: groupMembers = dvdb.getAllMembers();
1565: int size = groupMembers.size();
1566: boolean success = false;
1567: // Get the result of each controller
1568: for (int i = 0; i < size; i++) {
1569: Member member = (Member) groupMembers.get(i);
1570: if ((responses.getFailedMembers() != null)
1571: && responses.getFailedMembers()
1572: .contains(member)) {
1573: logger.warn("Controller " + member
1574: + " is suspected of failure.");
1575: continue;
1576: }
1577: Object r = responses.getResult(member);
1578: if (r instanceof Boolean) {
1579: if (((Boolean) r).booleanValue())
1580: success = true;
1581: else
1582: logger
1583: .error("Unexpected result for controller "
1584: + member);
1585: } else if (r instanceof NoMoreBackendException) {
1586: if (controllersWithoutBackends == null)
1587: controllersWithoutBackends = new ArrayList();
1588: controllersWithoutBackends.add(member);
1589: if (logger.isDebugEnabled())
1590: logger.debug("Controller " + member
1591: + " has no more backends to "
1592: + "set savepoint " + name
1593: + " to transaction " + transactionId
1594: + " (" + r + ")");
1595: } else if (r instanceof AllBackendsFailedException) {
1596: if (failedOnAllBackends == null)
1597: failedOnAllBackends = new ArrayList();
1598: failedOnAllBackends.add(member);
1599: if (logger.isDebugEnabled())
1600: logger
1601: .debug("set savepoint failed on all backends of controller "
1602: + member + " (" + r + ")");
1603: } else if (r instanceof SQLException) {
1604: if (failedOnAllBackends == null)
1605: failedOnAllBackends = new ArrayList();
1606: failedOnAllBackends.add(member);
1607: String msg = "set savepoint " + name
1608: + " to transaction " + transactionId
1609: + " failed on controller " + member + " ("
1610: + r + ")";
1611: logger.warn(msg);
1612: exception = (SQLException) r;
1613: }
1614: }
1615:
1616: // Notify all controllers where all backend failed (if any) that
1617: // completion was 'success'
1618: notifyRequestCompletion(setSavepointRequest, success, true,
1619: failedOnAllBackends);
1620:
1621: // Notify controllers without active backends (if any)
1622: notifyRequestCompletion(setSavepointRequest, success,
1623: false, controllersWithoutBackends);
1624:
1625: if (success)
1626: return; // This is a success if at least one controller has succeeded
1627:
1628: if (exception != null)
1629: throw exception;
1630:
1631: // At this point, all controllers failed
1632:
1633: if (exception != null)
1634: throw exception;
1635: else {
1636: String msg = "Set savepoint " + name
1637: + " to transaction " + transactionId
1638: + " failed on all controllers";
1639: logger.warn(msg);
1640: throw new SQLException(msg);
1641: }
1642: } catch (SQLException e) {
1643: String msg = "Set savepoint " + name + " to transaction "
1644: + transactionId + " failed (" + e + ")";
1645: logger.warn(msg);
1646: throw e;
1647: }
1648: }
1649:
1650: /**
1651: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedReleaseSavepoint(String,
1652: * long, String)
1653: */
1654: public void distributedReleaseSavepoint(String login,
1655: long transactionId, String name) throws SQLException {
1656: try {
1657: List groupMembers = dvdb.getAllMembers();
1658:
1659: AbstractRequest releaseSavepointRequest = new UnknownWriteRequest(
1660: "release " + name, false, 0, "\n");
1661: releaseSavepointRequest.setTransactionId(transactionId);
1662: releaseSavepointRequest.setLogin(login);
1663: releaseSavepointRequest.setIsAutoCommit(false);
1664:
1665: MulticastResponse responses = multicastRequest(
1666: releaseSavepointRequest,
1667: new DistributedReleaseSavepoint(transactionId, name),
1668: dvdb.getMessageTimeouts()
1669: .getReleaseSavepointTimeout(), groupMembers);
1670:
1671: if (logger.isDebugEnabled())
1672: logger.debug("Release savepoint " + name
1673: + " from transaction " + transactionId
1674: + " completed.");
1675:
1676: // List of controllers that gave a AllBackendsFailedException
1677: ArrayList failedOnAllBackends = null;
1678: // List of controllers that have no more backends to execute queries
1679: ArrayList controllersWithoutBackends = null;
1680: SQLException exception = null;
1681:
1682: // get a list that won't change while we go through it
1683: groupMembers = dvdb.getAllMembers();
1684: int size = groupMembers.size();
1685: boolean success = false;
1686: // Get the result of each controller
1687: for (int i = 0; i < size; i++) {
1688: Member member = (Member) groupMembers.get(i);
1689: if ((responses.getFailedMembers() != null)
1690: && responses.getFailedMembers()
1691: .contains(member)) {
1692: logger.warn("Controller " + member
1693: + " is suspected of failure.");
1694: continue;
1695: }
1696: Object r = responses.getResult(member);
1697: if (r instanceof Boolean) {
1698: if (((Boolean) r).booleanValue())
1699: success = true;
1700: else
1701: logger
1702: .error("Unexpected result for controller "
1703: + member);
1704: } else if (r instanceof NoMoreBackendException) {
1705: if (controllersWithoutBackends == null)
1706: controllersWithoutBackends = new ArrayList();
1707: controllersWithoutBackends.add(member);
1708: if (logger.isDebugEnabled())
1709: logger.debug("Controller " + member
1710: + " has no more backends to "
1711: + "release savepoint " + name
1712: + " from transaction " + transactionId
1713: + " (" + r + ")");
1714: } else if (r instanceof AllBackendsFailedException) {
1715: if (failedOnAllBackends == null)
1716: failedOnAllBackends = new ArrayList();
1717: failedOnAllBackends.add(member);
1718: if (logger.isDebugEnabled())
1719: logger
1720: .debug("release savepoint failed on all backends of "
1721: + "controller "
1722: + member
1723: + " ("
1724: + r + ")");
1725: } else if (r instanceof SQLException) {
1726: if (failedOnAllBackends == null)
1727: failedOnAllBackends = new ArrayList();
1728: failedOnAllBackends.add(member);
1729: String msg = "release savepoint " + name
1730: + " from transaction " + transactionId
1731: + " failed on controller " + member + " ("
1732: + r + ")";
1733: logger.warn(msg);
1734: exception = (SQLException) r;
1735: }
1736: }
1737:
1738: // Notify all controllers where all backend failed (if any) that
1739: // completion was 'success'
1740: notifyRequestCompletion(releaseSavepointRequest, success,
1741: true, failedOnAllBackends);
1742:
1743: // Notify controllers without active backends (if any)
1744: notifyRequestCompletion(releaseSavepointRequest, success,
1745: false, controllersWithoutBackends);
1746:
1747: if (success)
1748: return; // This is a success if at least one controller has succeeded
1749:
1750: if (exception != null)
1751: throw exception;
1752:
1753: // At this point, all controllers failed
1754:
1755: if (exception != null)
1756: throw exception;
1757: else {
1758: String msg = "Release savepoint " + name
1759: + " from transaction " + transactionId
1760: + " failed on all controllers";
1761: logger.warn(msg);
1762: throw new SQLException(msg);
1763: }
1764: } catch (SQLException e) {
1765: String msg = "Release savepoint " + name
1766: + " from transaction " + transactionId
1767: + " failed (" + e + ")";
1768: logger.warn(msg);
1769: throw e;
1770: }
1771: }
1772:
1773: }
|