0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Jean-Bernard van Zuylen, Damian Arregui.
0023: */package org.continuent.sequoia.controller.requestmanager.distributed;
0024:
0025: import java.io.Serializable;
0026: import java.sql.SQLException;
0027: import java.util.ArrayList;
0028: import java.util.Iterator;
0029: import java.util.List;
0030:
0031: import javax.management.NotCompliantMBeanException;
0032:
0033: import org.continuent.hedera.adapters.MulticastRequestAdapter;
0034: import org.continuent.hedera.adapters.MulticastResponse;
0035: import org.continuent.hedera.common.Member;
0036: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0037: import org.continuent.sequoia.common.exceptions.NoResultAvailableException;
0038: import org.continuent.sequoia.common.i18n.Translate;
0039: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0040: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0041: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0042: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0043: import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
0044: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0045: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0046: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0047: import org.continuent.sequoia.controller.requests.AbstractRequest;
0048: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0049: import org.continuent.sequoia.controller.requests.SelectRequest;
0050: import org.continuent.sequoia.controller.requests.StoredProcedure;
0051: import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
0052: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0053: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0054: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
0055: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedAbort;
0056: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCallableStatementExecute;
0057: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCallableStatementExecuteQuery;
0058: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCallableStatementExecuteUpdate;
0059: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0060: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0061: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest;
0062: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0063: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0064: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0065: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecute;
0066: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecuteQuery;
0067: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecuteUpdate;
0068: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedStatementExecuteUpdateWithKeys;
0069: import org.continuent.sequoia.controller.virtualdatabase.protocol.ExecRemoteStatementExecuteQuery;
0070: import org.continuent.sequoia.controller.virtualdatabase.protocol.MessageTimeouts;
0071:
0072: /**
0073: * This class defines a RAIDb1DistributedRequestManager
0074: *
0075: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0076: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0077: * </a>
0078: * @author <a href="mailto:Damian.Arregui@emicnetworks.com">Damian Arregui</a>
0079: * @version 1.0
0080: */
0081: public class RAIDb1DistributedRequestManager extends
0082: DistributedRequestManager {
0083: /**
0084: * Creates a new <code>RAIDb1DistributedRequestManager</code> instance
0085: *
0086: * @param vdb the virtual database this request manager belongs to
0087: * @param scheduler the Request Scheduler to use
0088: * @param cache a Query Cache implementation
0089: * @param loadBalancer the Request Load Balancer to use
0090: * @param recoveryLog the Log Recovery to use
0091: * @param beginTimeout timeout in seconds for begin
0092: * @param commitTimeout timeout in seconds for commit
0093: * @param rollbackTimeout timeout in seconds for rollback
0094: * @throws SQLException if an error occurs
0095: * @throws NotCompliantMBeanException if the MBean is not JMX compliant
0096: */
0097: public RAIDb1DistributedRequestManager(
0098: DistributedVirtualDatabase vdb,
0099: AbstractScheduler scheduler, AbstractResultCache cache,
0100: AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
0101: long beginTimeout, long commitTimeout, long rollbackTimeout)
0102: throws SQLException, NotCompliantMBeanException {
0103: super (vdb, scheduler, cache, loadBalancer, recoveryLog,
0104: beginTimeout, commitTimeout, rollbackTimeout);
0105: }
0106:
0107: /**
0108: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
0109: */
0110: public ControllerResultSet distributedStatementExecuteQuery(
0111: SelectRequest request) throws SQLException {
0112: return executeRequestReturningResultSet(request,
0113: new DistributedStatementExecuteQuery(request), dvdb
0114: .getMessageTimeouts()
0115: .getExecReadRequestTimeout());
0116: }
0117:
0118: /**
0119: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#execRemoteStatementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
0120: */
0121: public ControllerResultSet execRemoteStatementExecuteQuery(
0122: SelectRequest request) throws SQLException {
0123: if (dvdb.isProcessMacroBeforeBroadcast())
0124: loadBalancer.handleMacros(request);
0125: try {
0126: // Iterate over controllers in members list (but us) until someone
0127: // successfully executes our request.
0128: Iterator i = dvdb.getAllMemberButUs().iterator();
0129: SQLException error = null;
0130: while (i.hasNext()) {
0131: Member controller = (Member) i.next();
0132: List groupMembers = new ArrayList();
0133: groupMembers.add(controller);
0134:
0135: if (logger.isDebugEnabled())
0136: logger
0137: .debug("Sending request "
0138: + request.getSqlShortForm(dvdb
0139: .getSqlShortFormLength())
0140: + (request.isAutoCommit() ? ""
0141: : " transaction "
0142: + request
0143: .getTransactionId())
0144: + " to " + controller);
0145:
0146: // Send query to remote controller
0147: MulticastResponse responses;
0148: responses = dvdb.getMulticastRequestAdapter()
0149: .multicastMessage(
0150: groupMembers,
0151: new ExecRemoteStatementExecuteQuery(
0152: request),
0153: MulticastRequestAdapter.WAIT_ALL,
0154: MessageTimeouts.getMinTimeout(dvdb
0155: .getMessageTimeouts()
0156: .getExecReadRequestTimeout(),
0157: request.getTimeout() * 1000L));
0158:
0159: if (logger.isDebugEnabled())
0160: logger.debug("Request "
0161: + request.getSqlShortForm(dvdb
0162: .getSqlShortFormLength())
0163: + " completed.");
0164:
0165: Object ret = responses.getResult(controller);
0166: if (ret instanceof ControllerResultSet)
0167: return (ControllerResultSet) ret;
0168: else if (ret instanceof SQLException)
0169: error = (SQLException) ret;
0170: }
0171:
0172: if (error == null) {
0173: // No one answered, throw
0174: throw new NoMoreBackendException();
0175: } else
0176: throw error;
0177: } catch (Exception e) {
0178: String msg = "An error occured while executing remote select request "
0179: + request.getId();
0180: logger.warn(msg, e);
0181: if (e instanceof SQLException)
0182: throw (SQLException) e;
0183: else
0184: throw new SQLException(msg + " (" + e + ")");
0185: }
0186: }
0187:
0188: /**
0189: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecuteUpdate(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
0190: */
0191: public ExecuteUpdateResult distributedStatementExecuteUpdate(
0192: AbstractWriteRequest request) throws SQLException {
0193: return executeRequestReturningInt(request,
0194: new DistributedStatementExecuteUpdate(request), dvdb
0195: .getMessageTimeouts()
0196: .getExecWriteRequestTimeout());
0197: }
0198:
0199: /**
0200: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecuteUpdateWithKeys(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
0201: */
0202: public GeneratedKeysResult distributedStatementExecuteUpdateWithKeys(
0203: AbstractWriteRequest request) throws SQLException {
0204: return executeRequestReturningGeneratedKeys(request,
0205: new DistributedStatementExecuteUpdateWithKeys(request),
0206: dvdb.getMessageTimeouts()
0207: .getExecWriteRequestWithKeysTimeout());
0208: }
0209:
0210: /**
0211: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedStatementExecute(org.continuent.sequoia.controller.requests.AbstractRequest)
0212: */
0213: public ExecuteResult distributedStatementExecute(
0214: AbstractRequest request) throws SQLException {
0215: return executeRequestReturningExecuteResult(request,
0216: new DistributedStatementExecute(request), dvdb
0217: .getMessageTimeouts()
0218: .getExecReadStoredProcedureTimeout());
0219: }
0220:
0221: /**
0222: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCallableStatementExecuteQuery(StoredProcedure)
0223: */
0224: public ControllerResultSet distributedCallableStatementExecuteQuery(
0225: StoredProcedure proc) throws SQLException {
0226: StoredProcedureCallResult result = executeRequestReturningStoredProcedureCallResult(
0227: proc,
0228: new DistributedCallableStatementExecuteQuery(proc),
0229: dvdb.getMessageTimeouts()
0230: .getExecReadStoredProcedureTimeout());
0231: return (ControllerResultSet) result.getResult();
0232: }
0233:
0234: /**
0235: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCallableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0236: */
0237: public ExecuteUpdateResult distributedCallableStatementExecuteUpdate(
0238: StoredProcedure proc) throws SQLException {
0239: StoredProcedureCallResult result = executeRequestReturningStoredProcedureCallResult(
0240: proc, new DistributedCallableStatementExecuteUpdate(
0241: proc), dvdb.getMessageTimeouts()
0242: .getExecWriteStoredProcedureTimeout());
0243: return ((ExecuteUpdateResult) result.getResult());
0244: }
0245:
0246: /**
0247: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCallableStatementExecute(StoredProcedure)
0248: */
0249: public ExecuteResult distributedCallableStatementExecute(
0250: StoredProcedure proc) throws SQLException {
0251: StoredProcedureCallResult result = executeRequestReturningStoredProcedureCallResult(
0252: proc, new DistributedCallableStatementExecute(proc),
0253: dvdb.getMessageTimeouts()
0254: .getExecReadStoredProcedureTimeout());
0255: return (ExecuteResult) result.getResult();
0256: }
0257:
0258: /**
0259: * Execute a distributed request that returns a GeneratedKeysResult.
0260: *
0261: * @param request the request to execute
0262: * @param requestMsg message to be sent through the group communication
0263: * @param messageTimeout group message timeout
0264: * @return the corresponding ExecuteResult
0265: * @throws SQLException if an error occurs
0266: */
0267: private ExecuteResult executeRequestReturningExecuteResult(
0268: AbstractRequest request, DistributedRequest requestMsg,
0269: long messageTimeout) throws SQLException {
0270: if (dvdb.isProcessMacroBeforeBroadcast())
0271: loadBalancer.handleMacros(request);
0272: try {
0273: ExecuteResult requestResult = null;
0274:
0275: List groupMembers = dvdb.getAllMembers();
0276:
0277: MulticastResponse responses = multicastRequest(request,
0278: requestMsg, messageTimeout, groupMembers);
0279:
0280: // List of controllers that gave a AllBackendsFailedException
0281: ArrayList failedOnAllBackends = null;
0282: // List of controllers that have no more backends to execute queries
0283: ArrayList controllersWithoutBackends = null;
0284: SQLException exception = null;
0285: int size = groupMembers.size();
0286: List successfulControllers = null;
0287: // Get the result of each controller
0288: for (int i = 0; i < size; i++) {
0289: Member member = (Member) groupMembers.get(i);
0290: if ((responses.getFailedMembers() != null)
0291: && responses.getFailedMembers()
0292: .contains(member)) {
0293: logger.warn("Controller " + member
0294: + " is suspected of failure.");
0295: continue;
0296: }
0297: Object r = responses.getResult(member);
0298: if (r instanceof ExecuteResult) {
0299: if (requestResult == null)
0300: requestResult = (ExecuteResult) r;
0301: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0302: .equals(r)) {
0303: // Remote controller success
0304: if (requestResult == null) {
0305: if (successfulControllers == null)
0306: successfulControllers = new ArrayList();
0307: successfulControllers.add(member);
0308: }
0309: } else if (r instanceof NoMoreBackendException) {
0310: if (controllersWithoutBackends == null)
0311: controllersWithoutBackends = new ArrayList();
0312: controllersWithoutBackends.add(member);
0313: if (logger.isDebugEnabled())
0314: logger
0315: .debug("Controller "
0316: + member
0317: + " has no more backends to execute query ("
0318: + r + ")");
0319: } else if (r instanceof Exception) {
0320: if (failedOnAllBackends == null)
0321: failedOnAllBackends = new ArrayList();
0322: failedOnAllBackends.add(member);
0323: String msg = "Request " + request.getId()
0324: + " failed on controller " + member + " ("
0325: + r + ")";
0326: logger.warn(msg);
0327: if (r instanceof SQLException)
0328: exception = (SQLException) r;
0329: else {
0330: exception = new SQLException(
0331: "Internal exception " + r);
0332: exception.initCause((Throwable) r);
0333: }
0334: } else {
0335: if (failedOnAllBackends == null)
0336: failedOnAllBackends = new ArrayList();
0337: failedOnAllBackends.add(member);
0338: if (logger.isWarnEnabled())
0339: logger
0340: .warn("Unexpected answer from controller "
0341: + member
0342: + " ("
0343: + r
0344: + ") for request "
0345: + request
0346: .getSqlShortForm(vdb
0347: .getSqlShortFormLength()));
0348: }
0349: }
0350:
0351: if ((requestResult == null)
0352: && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0353: // remote controller
0354: try {
0355: requestResult = (ExecuteResult) getRequestResultFromFailoverCache(
0356: successfulControllers, request.getId());
0357: } catch (NoResultAvailableException e) {
0358: exception = new SQLException(
0359: "Request '"
0360: + request
0361: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0362: }
0363: }
0364:
0365: if ((controllersWithoutBackends != null)
0366: || (failedOnAllBackends != null)) { // Notify all controllers of completion
0367: // Re-create fake stored procedure for proper scheduler notification
0368: // (see DistributedStatementExecute#scheduleRequest)
0369: StoredProcedure proc = new StoredProcedure(request
0370: .getSqlOrTemplate(), request
0371: .getEscapeProcessing(), request.getTimeout(),
0372: request.getLineSeparator());
0373: proc.setIsAutoCommit(request.isAutoCommit());
0374: proc.setTransactionId(request.getTransactionId());
0375: proc.setTransactionIsolation(request
0376: .getTransactionIsolation());
0377: proc.setId(request.getId());
0378:
0379: /*
0380: * Notify all controllers where all backend failed (if any) that
0381: * completion was 'success'. We determine success from the local result
0382: * or the successful remote controllers even though they may have failed
0383: * afterwards (in which case requestResult may be null).
0384: */
0385: boolean success = (requestResult != null || successfulControllers != null);
0386:
0387: notifyRequestCompletion(proc, success, true,
0388: failedOnAllBackends);
0389:
0390: // Notify controllers without active backends (if any)
0391: notifyRequestCompletion(proc, success, false,
0392: controllersWithoutBackends);
0393: }
0394:
0395: if (requestResult != null) {
0396: if (logger.isDebugEnabled())
0397: logger.debug("Request " + request.getId()
0398: + " completed successfully.");
0399: return requestResult;
0400: }
0401:
0402: // At this point, all controllers failed
0403:
0404: if (exception != null)
0405: throw exception;
0406: else {
0407: String msg = "Request '"
0408: + request.getSqlShortForm(vdb
0409: .getSqlShortFormLength())
0410: + "' failed on all controllers";
0411: logger.warn(msg);
0412: throw new SQLException(msg);
0413: }
0414: } catch (SQLException e) {
0415: String msg = Translate.get("loadbalancer.request.failed",
0416: new String[] {
0417: request.getSqlShortForm(vdb
0418: .getSqlShortFormLength()),
0419: e.getMessage() });
0420: logger.warn(msg);
0421: throw e;
0422: }
0423: }
0424:
0425: /**
0426: * Execute a distributed request that returns a GeneratedKeysResult.
0427: *
0428: * @param request the request to execute
0429: * @param requestMsg message to be sent through the group communication
0430: * @param messageTimeout group message timeout
0431: * @return the corresponding GeneratedKeysResult
0432: * @throws SQLException if an error occurs
0433: */
0434: private GeneratedKeysResult executeRequestReturningGeneratedKeys(
0435: AbstractRequest request, DistributedRequest requestMsg,
0436: long messageTimeout) throws SQLException {
0437: if (dvdb.isProcessMacroBeforeBroadcast())
0438: loadBalancer.handleMacros(request);
0439: try {
0440: GeneratedKeysResult requestResult = null;
0441:
0442: List groupMembers = dvdb.getAllMembers();
0443:
0444: MulticastResponse responses = multicastRequest(request,
0445: requestMsg, messageTimeout, groupMembers);
0446:
0447: // List of controllers that gave a AllBackendsFailedException
0448: ArrayList failedOnAllBackends = null;
0449: // List of controllers that gave an inconsistent result
0450: ArrayList inconsistentControllers = null;
0451: // List of controllers that have no more backends to execute queries
0452: ArrayList controllersWithoutBackends = null;
0453: SQLException exception = null;
0454: List successfulControllers = null;
0455: int size = groupMembers.size();
0456: // Get the result of each controller
0457: for (int i = 0; i < size; i++) {
0458: Member member = (Member) groupMembers.get(i);
0459: if ((responses.getFailedMembers() != null)
0460: && responses.getFailedMembers()
0461: .contains(member)) {
0462: logger.warn("Controller " + member
0463: + " is suspected of failure.");
0464: continue;
0465: }
0466: Object r = responses.getResult(member);
0467: if (r instanceof GeneratedKeysResult) {
0468: if (requestResult == null) {
0469: requestResult = (GeneratedKeysResult) r;
0470: } else {
0471: if (requestResult.getUpdateCount() != ((GeneratedKeysResult) r)
0472: .getUpdateCount()) {
0473: if (inconsistentControllers == null)
0474: inconsistentControllers = new ArrayList();
0475: inconsistentControllers.add(member);
0476: logger
0477: .error("Controller "
0478: + member
0479: + " returned an inconsistent results ("
0480: + requestResult
0481: + " when expecting " + r
0482: + ") for request "
0483: + request.getId());
0484: }
0485: }
0486: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0487: .equals(r)) {
0488: // Remote controller success
0489: if (requestResult == null) {
0490: if (successfulControllers == null)
0491: successfulControllers = new ArrayList();
0492: successfulControllers.add(member);
0493: }
0494: } else if (r instanceof NoMoreBackendException) {
0495: if (controllersWithoutBackends == null)
0496: controllersWithoutBackends = new ArrayList();
0497: controllersWithoutBackends.add(member);
0498: if (logger.isDebugEnabled())
0499: logger
0500: .debug("Controller "
0501: + member
0502: + " has no more backends to execute query ("
0503: + r + ")");
0504: } else if (r instanceof Exception) {
0505: if (failedOnAllBackends == null)
0506: failedOnAllBackends = new ArrayList();
0507: failedOnAllBackends.add(member);
0508: String msg = "Request " + request.getId()
0509: + " failed on controller " + member + " ("
0510: + r + ")";
0511: logger.warn(msg);
0512: if (r instanceof SQLException)
0513: exception = (SQLException) r;
0514: else {
0515: exception = new SQLException(
0516: "Internal exception " + r);
0517: exception.initCause((Throwable) r);
0518: }
0519: } else {
0520: if (failedOnAllBackends == null)
0521: failedOnAllBackends = new ArrayList();
0522: failedOnAllBackends.add(member);
0523: if (logger.isWarnEnabled())
0524: logger
0525: .warn("Unexpected answer from controller "
0526: + member
0527: + " ("
0528: + r
0529: + ") for request "
0530: + request
0531: .getSqlShortForm(vdb
0532: .getSqlShortFormLength()));
0533: }
0534: }
0535:
0536: if ((requestResult == null)
0537: && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0538: // remote controller
0539: try {
0540: requestResult = (GeneratedKeysResult) getRequestResultFromFailoverCache(
0541: successfulControllers, request.getId());
0542: } catch (NoResultAvailableException e) {
0543: exception = new SQLException(
0544: "Request '"
0545: + request
0546: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0547: }
0548: }
0549:
0550: /*
0551: * Notify all controllers where all backend failed (if any) that
0552: * completion was 'success'. We determine success from the successful
0553: * controllers even though they may have failed afterwards (in which case
0554: * requestResult may be null).
0555: */
0556: boolean success = (requestResult != null || successfulControllers != null);
0557: notifyRequestCompletion(request, success, true,
0558: failedOnAllBackends);
0559:
0560: // Notify controllers without active backends (if any)
0561: notifyRequestCompletion(request, success, false,
0562: controllersWithoutBackends);
0563:
0564: if (inconsistentControllers != null) { // Notify all inconsistent controllers to disable their backends
0565: notifyControllerInconsistency(request,
0566: inconsistentControllers);
0567: }
0568:
0569: if (requestResult != null) {
0570: if (logger.isDebugEnabled())
0571: logger.debug("Request " + request.getId()
0572: + " completed successfully.");
0573: return requestResult;
0574: }
0575:
0576: // At this point, all controllers failed
0577:
0578: if (exception != null)
0579: throw exception;
0580: else {
0581: String msg = "Request '" + request
0582: + "' failed on all controllers";
0583: logger.warn(msg);
0584: throw new SQLException(msg);
0585: }
0586: } catch (SQLException e) {
0587: String msg = Translate.get("loadbalancer.request.failed",
0588: new String[] {
0589: request.getSqlShortForm(vdb
0590: .getSqlShortFormLength()),
0591: e.getMessage() });
0592: logger.warn(msg);
0593: throw e;
0594: }
0595: }
0596:
0597: /**
0598: * Execute a distributed request that returns a ResultSet.
0599: *
0600: * @param request the request to execute
0601: * @param requestMsg message to be sent through the group communication
0602: * @param messageTimeout group message timeout
0603: * @return the corresponding ResultSet
0604: * @throws SQLException if an error occurs
0605: */
0606: private ExecuteUpdateResult executeRequestReturningInt(
0607: AbstractRequest request, DistributedRequest requestMsg,
0608: long messageTimeout) throws SQLException {
0609: if (dvdb.isProcessMacroBeforeBroadcast())
0610: loadBalancer.handleMacros(request);
0611: try {
0612: ExecuteUpdateResult requestResult = null;
0613:
0614: List groupMembers = dvdb.getAllMembers();
0615:
0616: MulticastResponse responses = multicastRequest(request,
0617: requestMsg, messageTimeout, groupMembers);
0618:
0619: // List of controllers that gave a AllBackendsFailedException
0620: ArrayList failedOnAllBackends = null;
0621: // List of controllers that gave an inconsistent result
0622: ArrayList inconsistentControllers = null;
0623: // List of controllers that have no more backends to execute queries
0624: ArrayList controllersWithoutBackends = null;
0625: SQLException exception = null;
0626: int size = groupMembers.size();
0627: // Get the result of each controller
0628: for (int i = 0; i < size; i++) {
0629: Member member = (Member) groupMembers.get(i);
0630: if ((responses.getFailedMembers() != null)
0631: && responses.getFailedMembers()
0632: .contains(member)) {
0633: logger.warn("Controller " + member
0634: + " is suspected of failure.");
0635: continue;
0636: }
0637: Object r = responses.getResult(member);
0638: if (r instanceof ExecuteUpdateResult) {
0639: if (requestResult == null)
0640: requestResult = (ExecuteUpdateResult) r;
0641: else {
0642: if (requestResult.getUpdateCount() != ((ExecuteUpdateResult) r)
0643: .getUpdateCount()) {
0644: if (inconsistentControllers == null)
0645: inconsistentControllers = new ArrayList();
0646: inconsistentControllers.add(member);
0647: logger
0648: .error("Controller "
0649: + member
0650: + " returned an inconsistent results ("
0651: + requestResult
0652: .getUpdateCount()
0653: + " when expecting "
0654: + ((ExecuteUpdateResult) r)
0655: .getUpdateCount()
0656: + ") for request "
0657: + request.getId());
0658: }
0659: }
0660: } else if (r instanceof NoMoreBackendException) {
0661: if (controllersWithoutBackends == null)
0662: controllersWithoutBackends = new ArrayList();
0663: controllersWithoutBackends.add(member);
0664: if (logger.isDebugEnabled())
0665: logger
0666: .debug("Controller "
0667: + member
0668: + " has no more backends to execute query ("
0669: + r + ")");
0670: } else if (r instanceof Exception) {
0671: if (failedOnAllBackends == null)
0672: failedOnAllBackends = new ArrayList();
0673: failedOnAllBackends.add(member);
0674: String msg = "Request " + request.getId()
0675: + " failed on controller " + member + " ("
0676: + r + ")";
0677: logger.warn(msg);
0678: if (r instanceof SQLException)
0679: exception = (SQLException) r;
0680: else {
0681: exception = new SQLException(
0682: "Internal exception " + r);
0683: exception.initCause((Throwable) r);
0684: }
0685: } else {
0686: if (failedOnAllBackends == null)
0687: failedOnAllBackends = new ArrayList();
0688: failedOnAllBackends.add(member);
0689: if (logger.isWarnEnabled())
0690: logger
0691: .warn("Unexpected answer from controller "
0692: + member
0693: + " ("
0694: + r
0695: + ") for request "
0696: + request
0697: .getSqlShortForm(vdb
0698: .getSqlShortFormLength()));
0699: }
0700: }
0701:
0702: int requestUpdateCount = -1;
0703: if (requestResult != null)
0704: requestUpdateCount = requestResult.getUpdateCount();
0705:
0706: // Notify all controllers where all backends failed (if any) that
0707: // completion was 'success'
0708: notifyRequestCompletion(request, requestResult != null,
0709: true, failedOnAllBackends, requestUpdateCount);
0710:
0711: // Notify controllers without active backends (if any)
0712: notifyRequestCompletion(request, requestResult != null,
0713: false, controllersWithoutBackends,
0714: requestUpdateCount);
0715:
0716: if (inconsistentControllers != null) { // Notify all inconsistent controllers to disable their backends
0717: notifyControllerInconsistency(request,
0718: inconsistentControllers);
0719: }
0720:
0721: if (requestResult != null) {
0722: if (logger.isDebugEnabled())
0723: logger.debug("Request " + request.getId()
0724: + " completed successfully.");
0725: return requestResult;
0726: }
0727:
0728: // At this point, all controllers failed
0729:
0730: if (exception != null)
0731: throw exception;
0732: else {
0733: String msg = "Request '" + request
0734: + "' failed on all controllers";
0735: logger.warn(msg);
0736: throw new SQLException(msg);
0737: }
0738: } catch (SQLException e) {
0739: String msg = Translate.get("loadbalancer.request.failed",
0740: new String[] {
0741: request.getSqlShortForm(vdb
0742: .getSqlShortFormLength()),
0743: e.getMessage() });
0744: logger.warn(msg);
0745: throw e;
0746: }
0747: }
0748:
0749: /**
0750: * Execute a distributed request that returns a ResultSet.
0751: *
0752: * @param request the request to execute
0753: * @param requestMsg message to be sent through the group communication
0754: * @param messageTimeout group message timeout
0755: * @return the corresponding ResultSet
0756: * @throws SQLException if an error occurs
0757: */
0758: private ControllerResultSet executeRequestReturningResultSet(
0759: AbstractRequest request, DistributedRequest requestMsg,
0760: long messageTimeout) throws SQLException {
0761: if (dvdb.isProcessMacroBeforeBroadcast())
0762: loadBalancer.handleMacros(request);
0763: try {
0764: ControllerResultSet requestResult = null;
0765:
0766: List groupMembers = dvdb.getAllMembers();
0767:
0768: MulticastResponse responses = multicastRequest(request,
0769: requestMsg, messageTimeout, groupMembers);
0770:
0771: // List of controllers that gave a AllBackendsFailedException
0772: ArrayList failedOnAllBackends = null;
0773: // List of controllers that have no more backends to execute queries
0774: ArrayList controllersWithoutBackends = null;
0775: SQLException exception = null;
0776: int size = groupMembers.size();
0777: List successfulControllers = null;
0778: // Get the result of each controller
0779: for (int i = 0; i < size; i++) {
0780: Member member = (Member) groupMembers.get(i);
0781: if ((responses.getFailedMembers() != null)
0782: && responses.getFailedMembers()
0783: .contains(member)) {
0784: logger.warn("Controller " + member
0785: + " is suspected of failure.");
0786: continue;
0787: }
0788: Object r = responses.getResult(member);
0789: if (r instanceof ControllerResultSet) {
0790: if (requestResult == null)
0791: requestResult = (ControllerResultSet) r;
0792: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0793: .equals(r)) {
0794: // Remote controller success
0795: if (requestResult == null) {
0796: if (successfulControllers == null)
0797: successfulControllers = new ArrayList();
0798: successfulControllers.add(member);
0799: }
0800: } else if (r instanceof NoMoreBackendException) {
0801: if (controllersWithoutBackends == null)
0802: controllersWithoutBackends = new ArrayList();
0803: controllersWithoutBackends.add(member);
0804: if (logger.isDebugEnabled())
0805: logger
0806: .debug("Controller "
0807: + member
0808: + " has no more backends to execute query ("
0809: + r + ")");
0810: } else if (r instanceof Exception) {
0811: if (failedOnAllBackends == null)
0812: failedOnAllBackends = new ArrayList();
0813: failedOnAllBackends.add(member);
0814: String msg = "Request " + request.getId()
0815: + " failed on controller " + member + " ("
0816: + r + ")";
0817: logger.warn(msg);
0818: if (r instanceof SQLException)
0819: exception = (SQLException) r;
0820: else {
0821: exception = new SQLException(
0822: "Internal exception " + r);
0823: exception.initCause((Throwable) r);
0824: }
0825: } else {
0826: if (failedOnAllBackends == null)
0827: failedOnAllBackends = new ArrayList();
0828: failedOnAllBackends.add(member);
0829: if (logger.isWarnEnabled())
0830: logger
0831: .warn("Unexpected answer from controller "
0832: + member
0833: + " ("
0834: + r
0835: + ") for request "
0836: + request
0837: .getSqlShortForm(vdb
0838: .getSqlShortFormLength()));
0839: }
0840: }
0841:
0842: if ((requestResult == null)
0843: && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
0844: // remote controller
0845: try {
0846: requestResult = (ControllerResultSet) getRequestResultFromFailoverCache(
0847: successfulControllers, request.getId());
0848: } catch (NoResultAvailableException e) {
0849: exception = new SQLException(
0850: "Request '"
0851: + request
0852: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
0853: }
0854: }
0855:
0856: /*
0857: * Notify all controllers where all backend failed (if any) that
0858: * completion was 'success'. We determine success from the local result or
0859: * the successful remote controllers even though they may have failed
0860: * afterwards (in which case requestResult may be null).
0861: */
0862: boolean success = (requestResult != null || successfulControllers != null);
0863:
0864: notifyRequestCompletion(request, success, true,
0865: failedOnAllBackends);
0866:
0867: // Notify controllers without active backends (if any)
0868: notifyRequestCompletion(request, success, false,
0869: controllersWithoutBackends);
0870:
0871: if (requestResult != null) {
0872: if (logger.isDebugEnabled())
0873: logger.debug("Request " + request.getId()
0874: + " completed successfully.");
0875: return requestResult;
0876: }
0877:
0878: // At this point, all controllers failed
0879:
0880: if (exception != null)
0881: throw exception;
0882: else {
0883: String msg = "Request '" + request
0884: + "' failed on all controllers";
0885: logger.warn(msg);
0886: throw new SQLException(msg);
0887: }
0888: } catch (SQLException e) {
0889: String msg = Translate.get("loadbalancer.request.failed",
0890: new String[] {
0891: request.getSqlShortForm(vdb
0892: .getSqlShortFormLength()),
0893: e.getMessage() });
0894: logger.warn(msg);
0895: throw e;
0896: }
0897: }
0898:
0899: /**
0900: * Execute a distributed request (usually a stored procedure) that returns a
0901: * StoredProcedureCallResult.
0902: *
0903: * @param proc the stored procedure to execute
0904: * @param requestMsg message to be sent through the group communication
0905: * @param messageTimeout group message timeout
0906: * @return the corresponding StoredProcedureCallResult
0907: * @throws SQLException if an error occurs
0908: */
0909: private StoredProcedureCallResult executeRequestReturningStoredProcedureCallResult(
0910: StoredProcedure proc, DistributedRequest requestMsg,
0911: long messageTimeout) throws SQLException {
0912: if (dvdb.isProcessMacroBeforeBroadcast())
0913: loadBalancer.handleMacros(proc);
0914: try {
0915: StoredProcedureCallResult result = null;
0916:
0917: List groupMembers = dvdb.getAllMembers();
0918:
0919: MulticastResponse responses = multicastRequest(proc,
0920: requestMsg, messageTimeout, groupMembers);
0921:
0922: // List of controllers that gave a AllBackendsFailedException
0923: ArrayList failedOnAllBackends = null;
0924: // List of controllers that have no more backends to execute queries
0925: ArrayList controllersWithoutBackends = null;
0926: // List of controllers that gave an inconsistent result
0927: ArrayList inconsistentControllers = null;
0928: SQLException exception = null;
0929: int size = groupMembers.size();
0930: List successfulControllers = null;
0931: // Get the result of each controller
0932: for (int i = 0; i < size; i++) {
0933: Member member = (Member) groupMembers.get(i);
0934: if ((responses.getFailedMembers() != null)
0935: && responses.getFailedMembers()
0936: .contains(member)) {
0937: logger.warn("Controller " + member
0938: + " is suspected of failure.");
0939: continue;
0940: }
0941: Object r = responses.getResult(member);
0942: if (r instanceof StoredProcedureCallResult) {
0943: if (result == null)
0944: result = (StoredProcedureCallResult) r;
0945:
0946: if (result.getResult() instanceof ExecuteUpdateResult
0947: && ((StoredProcedureCallResult) r)
0948: .getResult() instanceof ExecuteUpdateResult) {
0949: ExecuteUpdateResult requestResult = ((ExecuteUpdateResult) result
0950: .getResult());
0951: ExecuteUpdateResult executeUpdateResult = ((ExecuteUpdateResult) (((StoredProcedureCallResult) r)
0952: .getResult()));
0953: if (requestResult.getUpdateCount() != executeUpdateResult
0954: .getUpdateCount()) {
0955: if (inconsistentControllers == null)
0956: inconsistentControllers = new ArrayList();
0957: inconsistentControllers.add(member);
0958: logger
0959: .error("Controller "
0960: + member
0961: + " returned an inconsistent results ("
0962: + executeUpdateResult
0963: .getUpdateCount()
0964: + " when expecting "
0965: + requestResult
0966: .getUpdateCount()
0967: + ") for stored procedure "
0968: + proc.getId());
0969: }
0970: }
0971: } else if (DistributedRequestManager.SUCCESSFUL_COMPLETION
0972: .equals(r)) {
0973: // Remote controller success
0974: if (result == null) {
0975: if (successfulControllers == null)
0976: successfulControllers = new ArrayList();
0977: successfulControllers.add(member);
0978: }
0979: } else if (r instanceof NoMoreBackendException) {
0980: if (controllersWithoutBackends == null)
0981: controllersWithoutBackends = new ArrayList();
0982: controllersWithoutBackends.add(member);
0983: if (logger.isDebugEnabled())
0984: logger
0985: .debug("Controller "
0986: + member
0987: + " has no more backends to execute query ("
0988: + r + ")");
0989: } else if (r instanceof Exception) {
0990: if (failedOnAllBackends == null)
0991: failedOnAllBackends = new ArrayList();
0992: failedOnAllBackends.add(member);
0993: String msg = "Request " + proc.getId()
0994: + " failed on controller " + member + " ("
0995: + r + ")";
0996: logger.warn(msg);
0997: if (r instanceof SQLException)
0998: exception = (SQLException) r;
0999: else {
1000: exception = new SQLException(
1001: "Internal exception " + r);
1002: exception.initCause((Throwable) r);
1003: }
1004: } else {
1005: if (failedOnAllBackends == null)
1006: failedOnAllBackends = new ArrayList();
1007: failedOnAllBackends.add(member);
1008: if (logger.isWarnEnabled())
1009: logger
1010: .warn("Unexpected answer from controller "
1011: + member
1012: + " ("
1013: + r
1014: + ") for request "
1015: + proc
1016: .getSqlShortForm(vdb
1017: .getSqlShortFormLength()));
1018: }
1019: }
1020:
1021: if ((result == null) && (successfulControllers != null)) { // Local controller could not get the result, ask it to a successful
1022: // remote controller
1023:
1024: try {
1025: result = (StoredProcedureCallResult) getRequestResultFromFailoverCache(
1026: successfulControllers, proc.getId());
1027: } catch (NoResultAvailableException e) {
1028: exception = new SQLException(
1029: "Request '"
1030: + proc
1031: + "' was successfully executed on remote controllers, but all successful controllers failed before retrieving result");
1032: }
1033:
1034: }
1035:
1036: /*
1037: * Notify all controllers where all backend failed (if any) that
1038: * completion was 'success'. We determine success from the local result or
1039: * the successful remote controllers even though they may have failed
1040: * afterwards (in which case requestResult may be null).
1041: */
1042: boolean success = (result != null || successfulControllers != null);
1043:
1044: notifyRequestCompletion(proc, success, true,
1045: failedOnAllBackends);
1046:
1047: // Notify controllers without active backends (if any)
1048: notifyRequestCompletion(proc, success, false,
1049: controllersWithoutBackends);
1050:
1051: if (inconsistentControllers != null) { // Notify all inconsistent controllers to disable their backends
1052: notifyControllerInconsistency(proc,
1053: inconsistentControllers);
1054: }
1055:
1056: if (result != null) {
1057: proc.copyNamedAndOutParameters(result
1058: .getStoredProcedure());
1059: if (logger.isDebugEnabled())
1060: logger.debug("Request " + proc.getId()
1061: + " completed successfully.");
1062: return result;
1063: }
1064:
1065: // At this point, all controllers failed
1066:
1067: if (exception != null)
1068: throw exception;
1069: else {
1070: String msg = "Request '" + proc
1071: + "' failed on all controllers";
1072: logger.warn(msg);
1073: throw new SQLException(msg);
1074: }
1075: } catch (SQLException e) {
1076: String msg = Translate.get("loadbalancer.request.failed",
1077: new String[] {
1078: proc.getSqlShortForm(vdb
1079: .getSqlShortFormLength()),
1080: e.getMessage() });
1081: logger.warn(msg);
1082: throw e;
1083: }
1084: }
1085:
1086: /**
1087: * Multicast a request to a set of controllers.
1088: *
1089: * @param request the request that is executed
1090: * @param requestMsg the group message to send
1091: * @param messageTimeout timeout on the group message
1092: * @param groupMembers list of controllers to send the message to
1093: * @return the responses to the multicast
1094: * @throws SQLException if an error occurs
1095: */
1096: private MulticastResponse multicastRequest(AbstractRequest request,
1097: Serializable requestMsg, long messageTimeout,
1098: List groupMembers) throws SQLException {
1099: if (logger.isDebugEnabled())
1100: logger.debug("Broadcasting request "
1101: + request.getSqlShortForm(dvdb
1102: .getSqlShortFormLength())
1103: + (request.isAutoCommit() ? "" : " transaction "
1104: + request.getTransactionId())
1105: + " to all controllers ("
1106: + dvdb.getChannel().getLocalMembership() + "->"
1107: + groupMembers.toString() + ")");
1108:
1109: // Send the query to everybody including us
1110: MulticastResponse responses;
1111: try {
1112: // Warning! Message timeouts are in ms but request timeout is in seconds
1113: responses = dvdb.getMulticastRequestAdapter()
1114: .multicastMessage(
1115: groupMembers,
1116: requestMsg,
1117: MulticastRequestAdapter.WAIT_ALL,
1118: MessageTimeouts.getMinTimeout(
1119: messageTimeout, request
1120: .getTimeout() * 1000L));
1121: } catch (Exception e) {
1122: String msg = "An error occured while executing distributed request "
1123: + request.getId();
1124: logger.warn(msg, e);
1125: throw new SQLException(msg + " (" + e + ")");
1126: }
1127:
1128: if (logger.isDebugEnabled())
1129: logger.debug("Request "
1130: + request.getSqlShortForm(dvdb
1131: .getSqlShortFormLength()) + " completed.");
1132:
1133: if (responses.getFailedMembers() != null) { // Some controllers failed ... too bad !
1134: logger
1135: .warn(responses.getFailedMembers().size()
1136: + " controller(s) died during execution of request "
1137: + request.getId());
1138: }
1139: return responses;
1140: }
1141:
1142: /**
1143: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedAbort(String,
1144: * long)
1145: */
1146: public void distributedAbort(String login, long transactionId)
1147: throws SQLException {
1148: try {
1149: List groupMembers = dvdb.getAllMembers();
1150:
1151: AbstractRequest abortRequest = new UnknownWriteRequest(
1152: "abort", false, 0, "\n");
1153: abortRequest.setLogin(login);
1154: abortRequest.setTransactionId(transactionId);
1155: abortRequest.setIsAutoCommit(false);
1156:
1157: MulticastResponse responses = multicastRequest(
1158: abortRequest, new DistributedAbort(login,
1159: transactionId), dvdb.getMessageTimeouts()
1160: .getRollbackTimeout(), groupMembers);
1161:
1162: if (logger.isDebugEnabled())
1163: logger.debug("Abort of transaction " + transactionId
1164: + " completed.");
1165:
1166: // List of controllers that have no more backends to execute queries
1167: ArrayList controllersWithException = null;
1168: SQLException exception = null;
1169: int size = groupMembers.size();
1170: boolean success = false;
1171: // Get the result of each controller
1172: for (int i = 0; i < size; i++) {
1173: Member member = (Member) groupMembers.get(i);
1174: if ((responses.getFailedMembers() != null)
1175: && responses.getFailedMembers()
1176: .contains(member)) {
1177: logger.warn("Controller " + member
1178: + " is suspected of failure.");
1179: continue;
1180: }
1181: Object r = responses.getResult(member);
1182: if (r instanceof Boolean) {
1183: if (((Boolean) r).booleanValue())
1184: success = true;
1185: else
1186: logger
1187: .error("Unexpected result for controller "
1188: + member);
1189: } else if (r instanceof Exception) {
1190: if (controllersWithException == null)
1191: controllersWithException = new ArrayList();
1192: controllersWithException.add(member);
1193: if (logger.isDebugEnabled())
1194: logger
1195: .debug("Controller "
1196: + member
1197: + " has no more backends to abort transaction "
1198: + transactionId + " (" + r
1199: + ")");
1200: } else {
1201: if (logger.isWarnEnabled())
1202: logger
1203: .warn("Unexpected answer from controller "
1204: + member
1205: + " ("
1206: + r
1207: + ") for abort of transaction "
1208: + transactionId);
1209: }
1210: }
1211:
1212: // Notify all controllers that have thrown an exception
1213: notifyRequestCompletion(abortRequest, success, false,
1214: controllersWithException);
1215:
1216: if (success)
1217: return; // This is a success if at least one controller has succeeded
1218:
1219: if (exception != null)
1220: throw exception;
1221:
1222: // At this point, all controllers failed
1223:
1224: String msg = "Transaction " + transactionId
1225: + " failed to abort on all controllers";
1226: logger.warn(msg);
1227: throw new SQLException(msg);
1228: } catch (SQLException e) {
1229: String msg = "Transaction " + transactionId
1230: + " abort failed (" + e + ")";
1231: logger.warn(msg);
1232: throw e;
1233: }
1234: }
1235:
1236: /**
1237: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedCommit(String,
1238: * long)
1239: */
1240: public void distributedCommit(String login, long transactionId)
1241: throws SQLException {
1242: try {
1243: List groupMembers = dvdb.getAllMembers();
1244: AbstractRequest commitRequest = new UnknownWriteRequest(
1245: "commit", false, 0, "\n");
1246: commitRequest.setTransactionId(transactionId);
1247: commitRequest.setLogin(login);
1248: commitRequest.setIsAutoCommit(false);
1249:
1250: MulticastResponse responses = multicastRequest(
1251: commitRequest, new DistributedCommit(login,
1252: transactionId), dvdb.getMessageTimeouts()
1253: .getCommitTimeout(), groupMembers);
1254:
1255: if (logger.isDebugEnabled())
1256: logger.debug("Commit of transaction " + transactionId
1257: + " completed.");
1258:
1259: // List of controllers that gave a AllBackendsFailedException
1260: ArrayList failedOnAllBackends = null;
1261: // List of controllers that have no more backends to execute queries
1262: ArrayList controllersWithoutBackends = null;
1263: SQLException exception = null;
1264: // get a list that won't change while we go through it
1265: groupMembers = dvdb.getAllMembers();
1266: int size = groupMembers.size();
1267: boolean success = false;
1268: // Get the result of each controller
1269: for (int i = 0; i < size; i++) {
1270: Member member = (Member) groupMembers.get(i);
1271: if ((responses.getFailedMembers() != null)
1272: && responses.getFailedMembers()
1273: .contains(member)) {
1274: logger.warn("Controller " + member
1275: + " is suspected of failure.");
1276: continue;
1277: }
1278: Object r = responses.getResult(member);
1279: if (r instanceof Boolean) {
1280: if (((Boolean) r).booleanValue())
1281: success = true;
1282: else
1283: logger
1284: .error("Unexpected result for controller "
1285: + member);
1286: } else if (r instanceof NoMoreBackendException) {
1287: if (controllersWithoutBackends == null)
1288: controllersWithoutBackends = new ArrayList();
1289: controllersWithoutBackends.add(member);
1290: if (logger.isDebugEnabled())
1291: logger
1292: .debug("Controller "
1293: + member
1294: + " has no more backends to commit transaction "
1295: + transactionId + " (" + r
1296: + ")");
1297: } else if (r instanceof AllBackendsFailedException) {
1298: if (failedOnAllBackends == null)
1299: failedOnAllBackends = new ArrayList();
1300: failedOnAllBackends.add(member);
1301: if (logger.isDebugEnabled())
1302: logger
1303: .debug("Commit failed on all backends of controller "
1304: + member + " (" + r + ")");
1305: } else if (r instanceof SQLException) {
1306: if (failedOnAllBackends == null)
1307: failedOnAllBackends = new ArrayList();
1308: failedOnAllBackends.add(member);
1309: String msg = "Commit of transaction "
1310: + transactionId + " failed on controller "
1311: + member + " (" + r + ")";
1312: logger.warn(msg);
1313: exception = (SQLException) r;
1314: } else {
1315: if (failedOnAllBackends == null)
1316: failedOnAllBackends = new ArrayList();
1317: failedOnAllBackends.add(member);
1318: if (logger.isWarnEnabled())
1319: logger
1320: .warn("Unexpected answer from controller "
1321: + member
1322: + " ("
1323: + r
1324: + ") for commit of transaction "
1325: + transactionId);
1326: }
1327: }
1328:
1329: // Notify all controllers where all backend failed (if any) that
1330: // completion was 'success'
1331: notifyRequestCompletion(commitRequest, success, true,
1332: failedOnAllBackends);
1333:
1334: // Notify controllers without active backends (if any)
1335: notifyRequestCompletion(commitRequest, success, false,
1336: controllersWithoutBackends);
1337:
1338: if (success)
1339: return; // This is a success if at least one controller has succeeded
1340:
1341: // At this point, all controllers failed
1342:
1343: if (exception != null)
1344: throw exception;
1345: else {
1346: String msg = "Transaction " + transactionId
1347: + " failed to commit on all controllers";
1348: logger.warn(msg);
1349: throw new SQLException(msg);
1350: }
1351: } catch (SQLException e) {
1352: String msg = "Transaction " + transactionId
1353: + " commit failed (" + e + ")";
1354: logger.warn(msg);
1355: throw e;
1356: }
1357: }
1358:
1359: /**
1360: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedRollback(String,
1361: * long)
1362: */
1363: public void distributedRollback(String login, long transactionId)
1364: throws SQLException {
1365: try {
1366: List groupMembers = dvdb.getAllMembers();
1367:
1368: AbstractRequest rollbackRequest = new UnknownWriteRequest(
1369: "rollback", false, 0, "\n");
1370: rollbackRequest.setTransactionId(transactionId);
1371: rollbackRequest.setLogin(login);
1372: rollbackRequest.setIsAutoCommit(false);
1373:
1374: MulticastResponse responses = multicastRequest(
1375: rollbackRequest, new DistributedRollback(login,
1376: transactionId), dvdb.getMessageTimeouts()
1377: .getRollbackTimeout(), groupMembers);
1378:
1379: if (logger.isDebugEnabled())
1380: logger.debug("Rollback of transaction " + transactionId
1381: + " completed.");
1382:
1383: // List of controllers that gave a AllBackendsFailedException
1384: ArrayList failedOnAllBackends = null;
1385: // List of controllers that have no more backends to execute queries
1386: ArrayList controllersWithoutBackends = null;
1387: SQLException exception = null;
1388:
1389: // get a list that won't change while we go through it
1390: groupMembers = dvdb.getAllMembers();
1391: int size = groupMembers.size();
1392: boolean success = false;
1393: // Get the result of each controller
1394: for (int i = 0; i < size; i++) {
1395: Member member = (Member) groupMembers.get(i);
1396: if ((responses.getFailedMembers() != null)
1397: && responses.getFailedMembers()
1398: .contains(member)) {
1399: logger.warn("Controller " + member
1400: + " is suspected of failure.");
1401: continue;
1402: }
1403: Object r = responses.getResult(member);
1404: if (r instanceof Boolean) {
1405: if (((Boolean) r).booleanValue())
1406: success = true;
1407: else
1408: logger
1409: .error("Unexpected result for controller "
1410: + member);
1411: } else if (r instanceof NoMoreBackendException) {
1412: if (controllersWithoutBackends == null)
1413: controllersWithoutBackends = new ArrayList();
1414: controllersWithoutBackends.add(member);
1415: if (logger.isDebugEnabled())
1416: logger
1417: .debug("Controller "
1418: + member
1419: + " has no more backends to rollback transaction "
1420: + transactionId + " (" + r
1421: + ")");
1422: } else if (r instanceof AllBackendsFailedException) {
1423: if (failedOnAllBackends == null)
1424: failedOnAllBackends = new ArrayList();
1425: failedOnAllBackends.add(member);
1426: if (logger.isDebugEnabled())
1427: logger
1428: .debug("Rollback failed on all backends of controller "
1429: + member + " (" + r + ")");
1430: } else if (r instanceof SQLException) {
1431: if (failedOnAllBackends == null)
1432: failedOnAllBackends = new ArrayList();
1433: failedOnAllBackends.add(member);
1434: String msg = "Rollback of transaction "
1435: + transactionId + " failed on controller "
1436: + member + " (" + r + ")";
1437: logger.warn(msg);
1438: exception = (SQLException) r;
1439: } else {
1440: if (failedOnAllBackends == null)
1441: failedOnAllBackends = new ArrayList();
1442: failedOnAllBackends.add(member);
1443: if (logger.isWarnEnabled())
1444: logger
1445: .warn("Unexpected answer from controller "
1446: + member
1447: + " ("
1448: + r
1449: + ") for rollback of transaction "
1450: + transactionId);
1451: }
1452: }
1453:
1454: // Notify all controllers where all backend failed (if any) that
1455: // completion was 'success'
1456: notifyRequestCompletion(rollbackRequest, success, true,
1457: failedOnAllBackends);
1458:
1459: // Notify controllers without active backends (if any)
1460: notifyRequestCompletion(rollbackRequest, success, false,
1461: controllersWithoutBackends);
1462:
1463: if (success)
1464: return; // This is a success if at least one controller has succeeded
1465:
1466: if (exception != null)
1467: throw exception;
1468:
1469: // At this point, all controllers failed
1470:
1471: String msg = "Transaction " + transactionId
1472: + " failed to rollback on all controllers";
1473: logger.warn(msg);
1474: throw new SQLException(msg);
1475: } catch (SQLException e) {
1476: String msg = "Transaction " + transactionId
1477: + " rollback failed (" + e + ")";
1478: logger.warn(msg);
1479: throw e;
1480: }
1481: }
1482:
1483: /**
1484: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedRollback(String,
1485: * long, String)
1486: */
1487: public void distributedRollback(String login, long transactionId,
1488: String savepointName) throws SQLException {
1489: try {
1490: List groupMembers = dvdb.getAllMembers();
1491:
1492: AbstractRequest rollbackRequest = new UnknownWriteRequest(
1493: "rollback " + savepointName, false, 0, "\n");
1494: rollbackRequest.setTransactionId(transactionId);
1495: rollbackRequest.setLogin(login);
1496: rollbackRequest.setIsAutoCommit(false);
1497:
1498: MulticastResponse responses = multicastRequest(
1499: rollbackRequest,
1500: new DistributedRollbackToSavepoint(transactionId,
1501: savepointName), dvdb.getMessageTimeouts()
1502: .getRollbackToSavepointTimeout(),
1503: groupMembers);
1504:
1505: if (logger.isDebugEnabled())
1506: logger.debug("Rollback to savepoint " + savepointName
1507: + " for " + "transaction " + transactionId
1508: + " completed.");
1509:
1510: // List of controllers that gave a AllBackendsFailedException
1511: ArrayList failedOnAllBackends = null;
1512: // List of controllers that have no more backends to execute queries
1513: ArrayList controllersWithoutBackends = null;
1514: SQLException exception = null;
1515:
1516: // get a list that won't change while we go through it
1517: groupMembers = dvdb.getAllMembers();
1518: int size = groupMembers.size();
1519: boolean success = false;
1520: // Get the result of each controller
1521: for (int i = 0; i < size; i++) {
1522: Member member = (Member) groupMembers.get(i);
1523: if ((responses.getFailedMembers() != null)
1524: && responses.getFailedMembers()
1525: .contains(member)) {
1526: logger.warn("Controller " + member
1527: + " is suspected of failure.");
1528: continue;
1529: }
1530: Object r = responses.getResult(member);
1531: if (r instanceof Boolean) {
1532: if (((Boolean) r).booleanValue())
1533: success = true;
1534: else
1535: logger
1536: .error("Unexpected result for controller "
1537: + member);
1538: } else if (r instanceof NoMoreBackendException) {
1539: if (controllersWithoutBackends == null)
1540: controllersWithoutBackends = new ArrayList();
1541: controllersWithoutBackends.add(member);
1542: if (logger.isDebugEnabled())
1543: logger.debug("Controller " + member
1544: + " has no more backends to "
1545: + "rollback to savepoint "
1546: + savepointName + " for "
1547: + "transaction " + transactionId + " ("
1548: + r + ")");
1549: } else if (r instanceof AllBackendsFailedException) {
1550: if (failedOnAllBackends == null)
1551: failedOnAllBackends = new ArrayList();
1552: failedOnAllBackends.add(member);
1553: if (logger.isDebugEnabled())
1554: logger
1555: .debug("rollback to savepoint failed on all backends of "
1556: + "controller "
1557: + member
1558: + " ("
1559: + r + ")");
1560: } else if (r instanceof SQLException) {
1561: if (failedOnAllBackends == null)
1562: failedOnAllBackends = new ArrayList();
1563: failedOnAllBackends.add(member);
1564: String msg = "rollback to savepoint "
1565: + savepointName + " for " + "transaction "
1566: + transactionId + " failed on controller "
1567: + member + " (" + r + ")";
1568: logger.warn(msg);
1569: exception = (SQLException) r;
1570: } else {
1571: if (failedOnAllBackends == null)
1572: failedOnAllBackends = new ArrayList();
1573: failedOnAllBackends.add(member);
1574: if (logger.isWarnEnabled())
1575: logger
1576: .warn("Unexpected answer from controller "
1577: + member
1578: + " ("
1579: + r
1580: + ") for rollback to savepoint "
1581: + savepointName
1582: + " of transaction "
1583: + transactionId);
1584: }
1585: }
1586:
1587: // Notify all controllers where all backend failed (if any) that
1588: // completion was 'success'
1589: notifyRequestCompletion(rollbackRequest, success, true,
1590: failedOnAllBackends);
1591:
1592: // Notify controllers without active backends (if any)
1593: notifyRequestCompletion(rollbackRequest, success, false,
1594: controllersWithoutBackends);
1595:
1596: if (success)
1597: return; // This is a success if at least one controller has succeeded
1598:
1599: if (exception != null)
1600: throw exception;
1601:
1602: // At this point, all controllers failed
1603:
1604: if (exception != null)
1605: throw exception;
1606: else {
1607: String msg = "Rollback to savepoint " + savepointName
1608: + " for " + "transaction " + transactionId
1609: + " failed on all controllers";
1610: logger.warn(msg);
1611: throw new SQLException(msg);
1612: }
1613: } catch (SQLException e) {
1614: String msg = "Rollback to savepoint " + savepointName
1615: + " for " + "transaction " + transactionId
1616: + " failed (" + e + ")";
1617: logger.warn(msg);
1618: throw e;
1619: }
1620: }
1621:
1622: /**
1623: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedSetSavepoint(String,
1624: * long, String)
1625: */
1626: public void distributedSetSavepoint(String login,
1627: long transactionId, String name) throws SQLException {
1628: try {
1629: List groupMembers = dvdb.getAllMembers();
1630:
1631: AbstractRequest setSavepointRequest = new UnknownWriteRequest(
1632: "savepoint " + name, false, 0, "\n");
1633: setSavepointRequest.setTransactionId(transactionId);
1634: setSavepointRequest.setLogin(login);
1635: setSavepointRequest.setIsAutoCommit(false);
1636:
1637: MulticastResponse responses = multicastRequest(
1638: setSavepointRequest, new DistributedSetSavepoint(
1639: login, transactionId, name), dvdb
1640: .getMessageTimeouts()
1641: .getSetSavepointTimeout(), groupMembers);
1642:
1643: if (logger.isDebugEnabled())
1644: logger.debug("Set savepoint " + name
1645: + " to transaction " + transactionId
1646: + " completed.");
1647:
1648: // List of controllers that gave a AllBackendsFailedException
1649: ArrayList failedOnAllBackends = null;
1650: // List of controllers that have no more backends to execute queries
1651: ArrayList controllersWithoutBackends = null;
1652: SQLException exception = null;
1653:
1654: // get a list that won't change while we go through it
1655: groupMembers = dvdb.getAllMembers();
1656: int size = groupMembers.size();
1657: boolean success = false;
1658: // Get the result of each controller
1659: for (int i = 0; i < size; i++) {
1660: Member member = (Member) groupMembers.get(i);
1661: if ((responses.getFailedMembers() != null)
1662: && responses.getFailedMembers()
1663: .contains(member)) {
1664: logger.warn("Controller " + member
1665: + " is suspected of failure.");
1666: continue;
1667: }
1668: Object r = responses.getResult(member);
1669: if (r instanceof Boolean) {
1670: if (((Boolean) r).booleanValue())
1671: success = true;
1672: else
1673: logger
1674: .error("Unexpected result for controller "
1675: + member);
1676: } else if (r instanceof NoMoreBackendException) {
1677: if (controllersWithoutBackends == null)
1678: controllersWithoutBackends = new ArrayList();
1679: controllersWithoutBackends.add(member);
1680: if (logger.isDebugEnabled())
1681: logger.debug("Controller " + member
1682: + " has no more backends to "
1683: + "set savepoint " + name
1684: + " to transaction " + transactionId
1685: + " (" + r + ")");
1686: } else if (r instanceof AllBackendsFailedException) {
1687: if (failedOnAllBackends == null)
1688: failedOnAllBackends = new ArrayList();
1689: failedOnAllBackends.add(member);
1690: if (logger.isDebugEnabled())
1691: logger
1692: .debug("set savepoint failed on all backends of controller "
1693: + member + " (" + r + ")");
1694: } else if (r instanceof SQLException) {
1695: if (failedOnAllBackends == null)
1696: failedOnAllBackends = new ArrayList();
1697: failedOnAllBackends.add(member);
1698: String msg = "set savepoint " + name
1699: + " to transaction " + transactionId
1700: + " failed on controller " + member + " ("
1701: + r + ")";
1702: logger.warn(msg);
1703: exception = (SQLException) r;
1704: } else {
1705: if (failedOnAllBackends == null)
1706: failedOnAllBackends = new ArrayList();
1707: failedOnAllBackends.add(member);
1708: if (logger.isWarnEnabled())
1709: logger
1710: .warn("Unexpected answer from controller "
1711: + member
1712: + " ("
1713: + r
1714: + ") for set savepoint "
1715: + name
1716: + " of transaction "
1717: + transactionId);
1718: }
1719: }
1720:
1721: // Notify all controllers where all backend failed (if any) that
1722: // completion was 'success'
1723: notifyRequestCompletion(setSavepointRequest, success, true,
1724: failedOnAllBackends);
1725:
1726: // Notify controllers without active backends (if any)
1727: notifyRequestCompletion(setSavepointRequest, success,
1728: false, controllersWithoutBackends);
1729:
1730: if (success)
1731: return; // This is a success if at least one controller has succeeded
1732:
1733: if (exception != null)
1734: throw exception;
1735:
1736: // At this point, all controllers failed
1737:
1738: if (exception != null)
1739: throw exception;
1740: else {
1741: String msg = "Set savepoint " + name
1742: + " to transaction " + transactionId
1743: + " failed on all controllers";
1744: logger.warn(msg);
1745: throw new SQLException(msg);
1746: }
1747: } catch (SQLException e) {
1748: String msg = "Set savepoint " + name + " to transaction "
1749: + transactionId + " failed (" + e + ")";
1750: logger.warn(msg);
1751: throw e;
1752: }
1753: }
1754:
1755: /**
1756: * @see org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager#distributedReleaseSavepoint(String,
1757: * long, String)
1758: */
1759: public void distributedReleaseSavepoint(String login,
1760: long transactionId, String name) throws SQLException {
1761: try {
1762: List groupMembers = dvdb.getAllMembers();
1763:
1764: AbstractRequest releaseSavepointRequest = new UnknownWriteRequest(
1765: "release " + name, false, 0, "\n");
1766: releaseSavepointRequest.setTransactionId(transactionId);
1767: releaseSavepointRequest.setLogin(login);
1768: releaseSavepointRequest.setIsAutoCommit(false);
1769:
1770: MulticastResponse responses = multicastRequest(
1771: releaseSavepointRequest,
1772: new DistributedReleaseSavepoint(transactionId, name),
1773: dvdb.getMessageTimeouts()
1774: .getReleaseSavepointTimeout(), groupMembers);
1775:
1776: if (logger.isDebugEnabled())
1777: logger.debug("Release savepoint " + name
1778: + " from transaction " + transactionId
1779: + " completed.");
1780:
1781: // List of controllers that gave a AllBackendsFailedException
1782: ArrayList failedOnAllBackends = null;
1783: // List of controllers that have no more backends to execute queries
1784: ArrayList controllersWithoutBackends = null;
1785: SQLException exception = null;
1786:
1787: // get a list that won't change while we go through it
1788: groupMembers = dvdb.getAllMembers();
1789: int size = groupMembers.size();
1790: boolean success = false;
1791: // Get the result of each controller
1792: for (int i = 0; i < size; i++) {
1793: Member member = (Member) groupMembers.get(i);
1794: if ((responses.getFailedMembers() != null)
1795: && responses.getFailedMembers()
1796: .contains(member)) {
1797: logger.warn("Controller " + member
1798: + " is suspected of failure.");
1799: continue;
1800: }
1801: Object r = responses.getResult(member);
1802: if (r instanceof Boolean) {
1803: if (((Boolean) r).booleanValue())
1804: success = true;
1805: else
1806: logger
1807: .error("Unexpected result for controller "
1808: + member);
1809: } else if (r instanceof NoMoreBackendException) {
1810: if (controllersWithoutBackends == null)
1811: controllersWithoutBackends = new ArrayList();
1812: controllersWithoutBackends.add(member);
1813: if (logger.isDebugEnabled())
1814: logger.debug("Controller " + member
1815: + " has no more backends to "
1816: + "release savepoint " + name
1817: + " from transaction " + transactionId
1818: + " (" + r + ")");
1819: } else if (r instanceof AllBackendsFailedException) {
1820: if (failedOnAllBackends == null)
1821: failedOnAllBackends = new ArrayList();
1822: failedOnAllBackends.add(member);
1823: if (logger.isDebugEnabled())
1824: logger
1825: .debug("release savepoint failed on all backends of "
1826: + "controller "
1827: + member
1828: + " ("
1829: + r + ")");
1830: } else if (r instanceof SQLException) {
1831: if (failedOnAllBackends == null)
1832: failedOnAllBackends = new ArrayList();
1833: failedOnAllBackends.add(member);
1834: String msg = "release savepoint " + name
1835: + " from transaction " + transactionId
1836: + " failed on controller " + member + " ("
1837: + r + ")";
1838: logger.warn(msg);
1839: exception = (SQLException) r;
1840: } else {
1841: if (failedOnAllBackends == null)
1842: failedOnAllBackends = new ArrayList();
1843: failedOnAllBackends.add(member);
1844: if (logger.isWarnEnabled())
1845: logger
1846: .warn("Unexpected answer from controller "
1847: + member
1848: + " ("
1849: + r
1850: + ") for release savepoint "
1851: + name
1852: + " of transaction "
1853: + transactionId);
1854: }
1855: }
1856:
1857: // Notify all controllers where all backend failed (if any) that
1858: // completion was 'success'
1859: notifyRequestCompletion(releaseSavepointRequest, success,
1860: true, failedOnAllBackends);
1861:
1862: // Notify controllers without active backends (if any)
1863: notifyRequestCompletion(releaseSavepointRequest, success,
1864: false, controllersWithoutBackends);
1865:
1866: if (success)
1867: return; // This is a success if at least one controller has succeeded
1868:
1869: if (exception != null)
1870: throw exception;
1871:
1872: // At this point, all controllers failed
1873:
1874: if (exception != null)
1875: throw exception;
1876: else {
1877: String msg = "Release savepoint " + name
1878: + " from transaction " + transactionId
1879: + " failed on all controllers";
1880: logger.warn(msg);
1881: throw new SQLException(msg);
1882: }
1883: } catch (SQLException e) {
1884: String msg = "Release savepoint " + name
1885: + " from transaction " + transactionId
1886: + " failed (" + e + ")";
1887: logger.warn(msg);
1888: throw e;
1889: }
1890: }
1891:
1892: }
|