0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet
0022: * Contributor(s): ______________________
0023: */package org.continuent.sequoia.controller.loadbalancer.raidb1;
0024:
0025: import java.sql.Connection;
0026: import java.sql.SQLException;
0027: import java.util.ArrayList;
0028: import java.util.List;
0029:
0030: import javax.management.ObjectName;
0031:
0032: import org.continuent.sequoia.common.exceptions.BadConnectionException;
0033: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0034: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0035: import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0036: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0037: import org.continuent.sequoia.common.i18n.Translate;
0038: import org.continuent.sequoia.common.jmx.JmxConstants;
0039: import org.continuent.sequoia.common.log.Trace;
0040: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0041: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0042: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0043: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0044: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0045: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0046: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0047: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0048: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0049: import org.continuent.sequoia.controller.connection.PooledConnection;
0050: import org.continuent.sequoia.controller.jmx.MBeanServerManager;
0051: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0052: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0053: import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0054: import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueuesControl;
0055: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0056: import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0057: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask;
0058: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
0059: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask;
0060: import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0061: import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0062: import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0063: import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0064: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0065: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0066: import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0067: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteQueryTask;
0068: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteTask;
0069: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
0070: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask;
0071: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0072: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0073: import org.continuent.sequoia.controller.requests.AbstractRequest;
0074: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0075: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0076: import org.continuent.sequoia.controller.requests.SelectRequest;
0077: import org.continuent.sequoia.controller.requests.StoredProcedure;
0078: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0079: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
0080: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0081: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0082: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0083: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0084: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0085: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0086:
0087: /**
0088: * RAIDb-1 load balancer.
0089: * <p>
0090: * This class is an abstract call because the read requests coming from the
0091: * request controller are NOT treated here but in the subclasses. Transaction
0092: * management and write requests are broadcasted to all backends.
0093: *
0094: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0095: * @version 1.0
0096: */
0097: public abstract class RAIDb1 extends AbstractLoadBalancer {
0098: //
0099: // How the code is organized ?
0100: //
0101: // 1. Member variables
0102: // 2. Constructor(s)
0103: // 3. Request handling
0104: // 4. Transaction handling
0105: // 5. Backend management
0106: //
0107:
0108: protected static Trace logger = Trace
0109: .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb1");
0110:
0111: protected static Trace endUserLogger = Trace
0112: .getLogger("org.continuent.sequoia.enduser");
0113:
0114: /*
0115: * Constructors
0116: */
0117:
0118: /**
0119: * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
0120: * worker thread is created for each backend.
0121: *
0122: * @param vdb the virtual database this load balancer belongs to.
0123: * @param waitForCompletionPolicy How many backends must complete before
0124: * returning the result?
0125: * @exception Exception if an error occurs
0126: */
0127: public RAIDb1(VirtualDatabase vdb,
0128: WaitForCompletionPolicy waitForCompletionPolicy)
0129: throws Exception {
0130: super (vdb, RAIDbLevels.RAIDb1, ParsingGranularities.TABLE);
0131: this .waitForCompletionPolicy = waitForCompletionPolicy;
0132: }
0133:
0134: /*
0135: * Request Handling
0136: */
0137:
0138: /**
0139: * Perform a read request. If request.isMustBroadcast() is set, then the query
0140: * is broadcasted to all nodes else a single node is chosen according to the
0141: * load balancing policy.
0142: *
0143: * @param request the <code>SelectRequest</code> to execute
0144: * @param metadataCache MetadataCache (null if none)
0145: * @return the corresponding <code>ControllerResultSet</code>
0146: * @exception SQLException if an error occurs
0147: * @throws AllBackendsFailedException if all backends failed to execute the
0148: * request
0149: */
0150: public ControllerResultSet statementExecuteQuery(
0151: SelectRequest request, MetadataCache metadataCache)
0152: throws SQLException, AllBackendsFailedException {
0153: if (request.isMustBroadcast())
0154: return execBroadcastReadRequest(request, metadataCache);
0155: else
0156: return execSingleBackendReadRequest(request, metadataCache);
0157: }
0158:
0159: /**
0160: * Implementation specific execution of a request on a single backend chosen
0161: * according to the load balancing strategy.
0162: *
0163: * @param request the request to execute
0164: * @param metadataCache the metadata cache if any or null
0165: * @return the ResultSet
0166: * @throws SQLException if an error occurs
0167: */
0168: public abstract ControllerResultSet execSingleBackendReadRequest(
0169: SelectRequest request, MetadataCache metadataCache)
0170: throws SQLException;
0171:
0172: /**
0173: * Broadcast a read request execution on all backends. This is similar to a
0174: * write execution and is useful for queries such as SELECT ... FOR UPDATE.
0175: *
0176: * @param request the <code>SelectRequest</code> to execute
0177: * @param metadataCache MetadataCache (null if none)
0178: * @return the corresponding <code>ControllerResultSet</code>
0179: * @exception SQLException if an error occurs
0180: * @throws AllBackendsFailedException if all backends failed to execute the
0181: * request
0182: * @throws NoMoreBackendException if no backend was available to execute the
0183: * stored procedure
0184: */
0185: private ControllerResultSet execBroadcastReadRequest(
0186: SelectRequest request, MetadataCache metadataCache)
0187: throws SQLException, AllBackendsFailedException,
0188: NoMoreBackendException {
0189: // Handle macros
0190: handleMacros(request);
0191:
0192: // Total ordering for distributed virtual databases.
0193: boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0194: true);
0195:
0196: // Log lazy begin if needed
0197: if (request.isLazyTransactionStart())
0198: this .vdb.getRequestManager().logLazyTransactionBegin(
0199: request.getTransactionId());
0200:
0201: // Log request
0202: if (recoveryLog != null)
0203: request.setLogId(recoveryLog.logRequestExecuting(request));
0204:
0205: int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0206: String.valueOf(request.getId()));
0207:
0208: // Create the task and just wait for the first node to return
0209: StatementExecuteQueryTask task = new StatementExecuteQueryTask(
0210: 1, nbOfThreads, request, metadataCache);
0211:
0212: // Selects are always posted in the non conflicting queue
0213: atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
0214: removeFromTotalOrderQueue);
0215:
0216: synchronized (task) {
0217: if (!task.hasCompleted())
0218: waitForTaskCompletion(request.getTimeout() * 1000L,
0219: String.valueOf(request.getId()), task);
0220:
0221: checkTaskCompletion(task);
0222: }
0223:
0224: // Update log with success
0225: if (recoveryLog != null)
0226: recoveryLog.logRequestCompletion(request.getLogId(), true,
0227: request.getExecTimeInMs());
0228:
0229: return task.getResult();
0230: }
0231:
0232: /**
0233: * Performs a write request. This request is broadcasted to all nodes.
0234: *
0235: * @param request an <code>AbstractWriteRequest</code>
0236: * @return number of rows affected by the request
0237: * @throws AllBackendsFailedException if all backends failed to execute the
0238: * request
0239: * @exception NoMoreBackendException if no backends are left to execute the
0240: * request
0241: * @exception SQLException if an error occurs
0242: */
0243: public ExecuteUpdateResult statementExecuteUpdate(
0244: AbstractWriteRequest request)
0245: throws AllBackendsFailedException, NoMoreBackendException,
0246: SQLException {
0247: return ((StatementExecuteUpdateTask) execWriteRequest(request,
0248: false, null)).getResult();
0249: }
0250:
0251: /**
0252: * Perform a write request and return the auto generated keys.
0253: *
0254: * @param request the request to execute
0255: * @param metadataCache the metadataCache if any or null
0256: * @return update count and auto generated keys.
0257: * @throws AllBackendsFailedException if all backends failed to execute the
0258: * request
0259: * @exception NoMoreBackendException if no backends are left to execute the
0260: * request
0261: * @exception SQLException if an error occurs
0262: */
0263: public GeneratedKeysResult statementExecuteUpdateWithKeys(
0264: AbstractWriteRequest request, MetadataCache metadataCache)
0265: throws AllBackendsFailedException, NoMoreBackendException,
0266: SQLException {
0267: return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(
0268: request, true, metadataCache)).getResult();
0269: }
0270:
0271: /**
0272: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0273: * MetadataCache)
0274: */
0275: public ExecuteResult statementExecute(AbstractRequest request,
0276: MetadataCache metadataCache) throws SQLException,
0277: AllBackendsFailedException {
0278: StatementExecuteTask task = (StatementExecuteTask) callStoredProcedure(
0279: request, STATEMENT_EXECUTE_TASK, metadataCache);
0280: return task.getResult();
0281: }
0282:
0283: /**
0284: * Common code for execWriteRequest(AbstractWriteRequest) and
0285: * execWriteRequestWithKeys(AbstractWriteRequest).
0286: * <p>
0287: * Note that macros are processed here.
0288: * <p>
0289: * The result is given back in AbstractTask.getResult().
0290: *
0291: * @param request the request to execute
0292: * @param useKeys true if this must give an auto generated keys ResultSet
0293: * @param metadataCache the metadataCache if any or null
0294: * @throws AllBackendsFailedException if all backends failed to execute the
0295: * request
0296: * @throws SQLException if an error occurs
0297: */
0298: private AbstractTask execWriteRequest(AbstractWriteRequest request,
0299: boolean useKeys, MetadataCache metadataCache)
0300: throws AllBackendsFailedException, NoMoreBackendException,
0301: SQLException {
0302: // Handle macros
0303: handleMacros(request);
0304:
0305: // Total ordering mainly for distributed virtual databases.
0306: boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0307: true);
0308:
0309: // Log lazy begin if needed
0310: if (request.isLazyTransactionStart())
0311: this .vdb.getRequestManager().logLazyTransactionBegin(
0312: request.getTransactionId());
0313:
0314: // Log request
0315: if (recoveryLog != null)
0316: recoveryLog.logRequestExecuting(request);
0317:
0318: int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0319: String.valueOf(request.getId()));
0320:
0321: // Create the task
0322: AbstractTask task;
0323: if (useKeys)
0324: task = new StatementExecuteUpdateWithKeysTask(
0325: getNbToWait(nbOfThreads), nbOfThreads, request,
0326: metadataCache);
0327: else
0328: task = new StatementExecuteUpdateTask(
0329: getNbToWait(nbOfThreads), nbOfThreads, request);
0330:
0331: atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
0332: removeFromTotalOrderQueue);
0333:
0334: try {
0335: synchronized (task) {
0336: if (!task.hasCompleted())
0337: waitForTaskCompletion(request.getTimeout() * 1000L,
0338: String.valueOf(request.getId()), task);
0339:
0340: checkTaskCompletion(task);
0341: return task;
0342: }
0343: } finally {
0344: if (!request.isAutoCommit()) { // Check that transaction was not aborted in parallel
0345: try {
0346: this .vdb
0347: .getRequestManager()
0348: .getTransactionMetaData(
0349: new Long(request.getTransactionId()));
0350: } catch (SQLException e) { // Transaction was aborted it cannot be found anymore in the active
0351: // transaction list. Force an abort
0352: logger
0353: .info("Concurrent abort detected, re-enforcing abort of transaction "
0354: + request.getTransactionId());
0355: abort(new TransactionMetaData(request
0356: .getTransactionId(), 0, request.getLogin(),
0357: request.isPersistentConnection(), request
0358: .getPersistentConnectionId()));
0359: }
0360: }
0361: }
0362: }
0363:
0364: protected static final int STATEMENT_EXECUTE_QUERY = 0;
0365: protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1;
0366: protected static final int CALLABLE_STATEMENT_EXECUTE = 2;
0367:
0368: /**
0369: * Execute a read request on the selected backend.
0370: *
0371: * @param request the request to execute
0372: * @param backend the backend that will execute the request
0373: * @param metadataCache the metadataCache if any or null
0374: * @return the ResultSet
0375: * @throws SQLException if an error occurs
0376: */
0377: protected ControllerResultSet executeRequestOnBackend(
0378: SelectRequest request, DatabaseBackend backend,
0379: MetadataCache metadataCache) throws SQLException,
0380: UnreachableBackendException {
0381: // Handle macros
0382: handleMacros(request);
0383:
0384: // Ok, we have a backend, let's execute the request
0385: AbstractConnectionManager cm = backend
0386: .getConnectionManager(request.getLogin());
0387:
0388: // Sanity check
0389: if (cm == null) {
0390: String msg = Translate.get(
0391: "loadbalancer.connectionmanager.not.found",
0392: new String[] { request.getLogin(),
0393: backend.getName() });
0394: logger.error(msg);
0395: throw new SQLException(msg);
0396: }
0397:
0398: // Execute the query
0399: if (request.isAutoCommit()) {
0400: ControllerResultSet rs = null;
0401: boolean badConnection;
0402: do {
0403: badConnection = false;
0404: // Use a connection just for this request
0405: PooledConnection c = null;
0406: try {
0407: c = cm.retrieveConnectionInAutoCommit(request);
0408: } catch (UnreachableBackendException e1) {
0409: String msg = Translate
0410: .get(
0411: "loadbalancer.backend.disabling.unreachable",
0412: backend.getName());
0413: logger.error(msg);
0414: endUserLogger.error(msg);
0415: disableBackend(backend, true);
0416: throw new UnreachableBackendException(Translate
0417: .get("loadbalancer.backend.unreacheable",
0418: backend.getName()));
0419: }
0420:
0421: // Sanity check
0422: if (c == null)
0423: throw new UnreachableBackendException(Translate
0424: .get("loadbalancer.backend.no.connection",
0425: backend.getName()));
0426:
0427: // Execute Query
0428: try {
0429: rs = executeStatementExecuteQueryOnBackend(request,
0430: backend, null, c.getConnection(),
0431: metadataCache);
0432: cm.releaseConnectionInAutoCommit(request, c);
0433: } catch (SQLException e) {
0434: cm.releaseConnectionInAutoCommit(request, c);
0435: throw SQLExceptionFactory
0436: .getSQLException(
0437: e,
0438: Translate
0439: .get(
0440: "loadbalancer.request.failed.on.backend",
0441: new String[] {
0442: request
0443: .getSqlShortForm(vdb
0444: .getSqlShortFormLength()),
0445: backend
0446: .getName(),
0447: e
0448: .getMessage() }));
0449: } catch (BadConnectionException e) { // Get rid of the bad connection
0450: cm.deleteConnection(c);
0451: if (request.isPersistentConnection()) {
0452: cm.deletePersistentConnection(request
0453: .getPersistentConnectionId());
0454: }
0455: badConnection = true;
0456: } catch (UnreachableBackendException e) {
0457: String msg = Translate
0458: .get(
0459: "loadbalancer.backend.disabling.unreachable",
0460: backend.getName());
0461: logger.error(msg);
0462: endUserLogger.error(msg);
0463: disableBackend(backend, true);
0464: throw new UnreachableBackendException(Translate
0465: .get("loadbalancer.backend.unreacheable",
0466: backend.getName()));
0467: } catch (Throwable e) {
0468:
0469: logger.error("Unexpected exception:", e);
0470: cm.releaseConnectionInAutoCommit(request, c);
0471: throw new SQLException(
0472: Translate
0473: .get(
0474: "loadbalancer.request.failed.on.backend",
0475: new String[] {
0476: request
0477: .getSqlShortForm(vdb
0478: .getSqlShortFormLength()),
0479: backend.getName(),
0480: e.getMessage() }));
0481: }
0482: } while (badConnection);
0483: if (logger.isDebugEnabled())
0484: logger.debug(Translate.get("loadbalancer.execute.on",
0485: new String[] { String.valueOf(request.getId()),
0486: backend.getName() }));
0487: return rs;
0488: } else { // Inside a transaction
0489: Connection c;
0490: long tid = request.getTransactionId();
0491:
0492: try {
0493: c = backend
0494: .getConnectionForTransactionAndLazyBeginIfNeeded(
0495: request, cm);
0496: } catch (UnreachableBackendException e1) {
0497: String msg = Translate.get(
0498: "loadbalancer.backend.disabling.unreachable",
0499: backend.getName());
0500: logger.error(msg);
0501: endUserLogger.error(msg);
0502: disableBackend(backend, true);
0503: throw new UnreachableBackendException(Translate.get(
0504: "loadbalancer.backend.unreacheable", backend
0505: .getName()));
0506: } catch (NoTransactionStartWhenDisablingException e) {
0507: String msg = Translate.get(
0508: "loadbalancer.backend.is.disabling",
0509: new String[] {
0510: request.getSqlShortForm(vdb
0511: .getSqlShortFormLength()),
0512: backend.getName() });
0513: logger.error(msg);
0514: throw new UnreachableBackendException(msg);
0515: }
0516:
0517: // Sanity check
0518: if (c == null)
0519: throw new SQLException(Translate.get(
0520: "loadbalancer.unable.retrieve.connection",
0521: new String[] { String.valueOf(tid),
0522: backend.getName() }));
0523:
0524: // Execute Query
0525: ControllerResultSet rs = null;
0526: try {
0527: rs = executeStatementExecuteQueryOnBackend(request,
0528: backend, null, c, metadataCache);
0529: } catch (SQLException e) {
0530: throw SQLExceptionFactory
0531: .getSQLException(
0532: e,
0533: Translate
0534: .get(
0535: "loadbalancer.request.failed.on.backend",
0536: new String[] {
0537: request
0538: .getSqlShortForm(vdb
0539: .getSqlShortFormLength()),
0540: backend
0541: .getName(),
0542: e.getMessage() }));
0543: } catch (BadConnectionException e) { // Connection failed, so did the transaction
0544: // Disable the backend.
0545: cm.deleteConnection(tid);
0546: String msg = Translate
0547: .get(
0548: "loadbalancer.backend.disabling.connection.failure",
0549: backend.getName());
0550: logger.error(msg);
0551: endUserLogger.error(msg);
0552: disableBackend(backend, true);
0553: throw new UnreachableBackendException(msg);
0554: } catch (UnreachableBackendException e) {
0555: String msg = Translate.get(
0556: "loadbalancer.backend.disabling.unreachable",
0557: backend.getName());
0558: logger.error(msg);
0559: endUserLogger.error(msg);
0560: disableBackend(backend, true);
0561: throw e;
0562: } catch (Throwable e) {
0563: logger.error("Unexpected exception:", e);
0564: throw new SQLException(Translate.get(
0565: "loadbalancer.request.failed.on.backend",
0566: new String[] {
0567: request.getSqlShortForm(vdb
0568: .getSqlShortFormLength()),
0569: backend.getName(), e.getMessage() }));
0570: }
0571: if (logger.isDebugEnabled())
0572: logger.debug(Translate.get(
0573: "loadbalancer.execute.transaction.on",
0574: new String[] { String.valueOf(tid),
0575: String.valueOf(request.getId()),
0576: backend.getName() }));
0577: return rs;
0578: }
0579: }
0580:
0581: /**
0582: * Execute a stored procedure on the selected backend.
0583: *
0584: * @param proc the stored procedure to execute
0585: * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0586: * false if we must call CallableStatement.execute()
0587: * @param backend the backend that will execute the request
0588: * @param metadataCache the metadataCache if any or null
0589: * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0590: * <code>ExecuteResult</code> object otherwise
0591: * @throws SQLException if an error occurs
0592: */
0593: protected Object executeStoredProcedureOnBackend(
0594: StoredProcedure proc, boolean isExecuteQuery,
0595: DatabaseBackend backend, MetadataCache metadataCache)
0596: throws SQLException, UnreachableBackendException {
0597: // Ok, we have a backend, let's execute the request
0598: AbstractConnectionManager cm = backend
0599: .getConnectionManager(proc.getLogin());
0600:
0601: // Sanity check
0602: if (cm == null) {
0603: String msg = Translate
0604: .get("loadbalancer.connectionmanager.not.found",
0605: new String[] { proc.getLogin(),
0606: backend.getName() });
0607: logger.error(msg);
0608: throw new SQLException(msg);
0609: }
0610:
0611: // Execute the query
0612: if (proc.isAutoCommit()) {
0613: Object result = null;
0614: boolean badConnection;
0615: PooledConnection c = null;
0616: do {
0617: badConnection = false;
0618: PooledConnection previousConnection = c;
0619: // Use a connection just for this request
0620: try {
0621: c = cm.retrieveConnectionInAutoCommit(proc);
0622: } catch (UnreachableBackendException e1) {
0623: String msg = Translate
0624: .get(
0625: "loadbalancer.backend.disabling.unreachable",
0626: backend.getName());
0627: logger.error(msg);
0628: endUserLogger.error(msg);
0629: disableBackend(backend, true);
0630: throw new UnreachableBackendException(Translate
0631: .get("loadbalancer.backend.unreacheable",
0632: backend.getName()));
0633: }
0634:
0635: // Sanity check
0636: if (c == null || c == previousConnection)
0637: throw new UnreachableBackendException(Translate
0638: .get("loadbalancer.backend.no.connection",
0639: backend.getName()));
0640:
0641: // Execute Query
0642: try {
0643: if (isExecuteQuery)
0644: result = AbstractLoadBalancer
0645: .executeCallableStatementExecuteQueryOnBackend(
0646: proc, backend, null, c
0647: .getConnection(),
0648: metadataCache);
0649: else
0650: result = AbstractLoadBalancer
0651: .executeCallableStatementExecuteOnBackend(
0652: proc, backend, null, c,
0653: metadataCache);
0654: } catch (BadConnectionException e) { // Get rid of the bad connection
0655: cm.deleteConnection(c);
0656: if (proc.isPersistentConnection())
0657: cm.deletePersistentConnection(proc
0658: .getPersistentConnectionId());
0659: badConnection = true;
0660: } catch (Throwable e) {
0661: logger.error("Unexpected exception:", e);
0662: throw new SQLException(
0663: Translate
0664: .get(
0665: "loadbalancer.storedprocedure.failed.on.backend",
0666: new String[] {
0667: proc
0668: .getSqlShortForm(vdb
0669: .getSqlShortFormLength()),
0670: backend.getName(),
0671: e.getMessage() }));
0672: } finally {
0673: cm.releaseConnectionInAutoCommit(proc, c);
0674: }
0675: } while (badConnection);
0676:
0677: if (logger.isDebugEnabled())
0678: logger.debug(Translate.get(
0679: "loadbalancer.storedprocedure.on",
0680: new String[] { String.valueOf(proc.getId()),
0681: backend.getName() }));
0682:
0683: return result;
0684: } else { // Inside a transaction
0685: Connection c;
0686: long tid = proc.getTransactionId();
0687:
0688: try {
0689: c = backend
0690: .getConnectionForTransactionAndLazyBeginIfNeeded(
0691: proc, cm);
0692: } catch (UnreachableBackendException e) {
0693: // intercept the UBE to disable the unreachable backend
0694: // and propagate the exception
0695: endUserLogger.error(Translate.get(
0696: "loadbalancer.backend.disabling.unreachable",
0697: backend.getName()));
0698: disableBackend(backend, true);
0699: throw e;
0700: } catch (NoTransactionStartWhenDisablingException e) {
0701: String msg = Translate.get(
0702: "loadbalancer.backend.is.disabling",
0703: new String[] {
0704: proc.getSqlShortForm(vdb
0705: .getSqlShortFormLength()),
0706: backend.getName() });
0707: logger.error(msg);
0708: throw new UnreachableBackendException(msg);
0709: }
0710:
0711: // Sanity check
0712: if (c == null)
0713: throw new SQLException(Translate.get(
0714: "loadbalancer.unable.retrieve.connection",
0715: new String[] { String.valueOf(tid),
0716: backend.getName() }));
0717:
0718: // Execute Query
0719: try {
0720: if (logger.isDebugEnabled())
0721: logger.debug(Translate.get(
0722: "loadbalancer.execute.transaction.on",
0723: new String[] { String.valueOf(tid),
0724: String.valueOf(proc.getId()),
0725: backend.getName() }));
0726: if (isExecuteQuery)
0727: return AbstractLoadBalancer
0728: .executeCallableStatementExecuteQueryOnBackend(
0729: proc, backend, null, c,
0730: metadataCache);
0731: else
0732: return AbstractLoadBalancer
0733: .executeCallableStatementExecuteOnBackend(
0734: proc,
0735: backend,
0736: null,
0737: cm
0738: .retrieveConnectionForTransaction(tid),
0739: metadataCache);
0740: } catch (BadConnectionException e) { // Connection failed, so did the transaction
0741: // Disable the backend.
0742: cm.deleteConnection(tid);
0743: String msg = Translate
0744: .get(
0745: "loadbalancer.backend.disabling.connection.failure",
0746: backend.getName());
0747: logger.error(msg);
0748: endUserLogger.error(msg);
0749: disableBackend(backend, true);
0750: throw new UnreachableBackendException(msg);
0751: } catch (Throwable e) {
0752: logger.error("Unexpected exception:", e);
0753: throw new SQLException(
0754: Translate
0755: .get(
0756: "loadbalancer.storedprocedure.failed.on.backend",
0757: new String[] {
0758: proc
0759: .getSqlShortForm(vdb
0760: .getSqlShortFormLength()),
0761: backend.getName(),
0762: e.getMessage() }));
0763: }
0764: }
0765: }
0766:
0767: /**
0768: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0769: * MetadataCache)
0770: */
0771: public ControllerResultSet callableStatementExecuteQuery(
0772: StoredProcedure proc, MetadataCache metadataCache)
0773: throws SQLException, AllBackendsFailedException {
0774: CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure(
0775: proc, EXECUTE_QUERY_TASK, metadataCache);
0776: return task.getResult();
0777: }
0778:
0779: /**
0780: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0781: */
0782: public ExecuteUpdateResult callableStatementExecuteUpdate(
0783: StoredProcedure proc) throws SQLException,
0784: AllBackendsFailedException {
0785: CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure(
0786: proc, EXECUTE_UPDATE_TASK, null);
0787: return task.getResult();
0788: }
0789:
0790: /**
0791: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
0792: * MetadataCache)
0793: */
0794: public ExecuteResult callableStatementExecute(StoredProcedure proc,
0795: MetadataCache metadataCache) throws SQLException,
0796: AllBackendsFailedException {
0797: CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure(
0798: proc, CALLABLE_EXECUTE_TASK, metadataCache);
0799: return task.getResult();
0800: }
0801:
0802: private static final int EXECUTE_QUERY_TASK = 0;
0803: private static final int EXECUTE_UPDATE_TASK = 1;
0804: private static final int CALLABLE_EXECUTE_TASK = 2;
0805: private static final int STATEMENT_EXECUTE_TASK = 3;
0806:
0807: /**
0808: * Post the stored procedure call in the threads task list.
0809: * <p>
0810: * Note that macros are also processed here.
0811: *
0812: * @param request the stored procedure to call or the request to execute if
0813: * taskType is STATEMENT_EXECUTE_TASK
0814: * @param taskType one of EXECUTE_QUERY_TASK, EXECUTE_UPDATE_TASK,
0815: * CALLABLE_EXECUTE_TASK or STATEMENT_EXECUTE_TASK
0816: * @param metadataCache the metadataCache if any or null
0817: * @return the task that has been executed (caller can get the result by
0818: * calling getResult())
0819: * @throws SQLException if an error occurs
0820: * @throws AllBackendsFailedException if all backends failed to execute the
0821: * stored procedure
0822: * @throws NoMoreBackendException if no backend was available to execute the
0823: * stored procedure
0824: */
0825: private AbstractTask callStoredProcedure(AbstractRequest request,
0826: int taskType, MetadataCache metadataCache)
0827: throws SQLException, AllBackendsFailedException,
0828: NoMoreBackendException {
0829: // Handle macros
0830: handleMacros(request);
0831:
0832: // Total ordering mainly for distributed virtual databases.
0833: boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0834: true);
0835:
0836: // Log lazy begin if needed
0837: if (request.isLazyTransactionStart())
0838: this .vdb.getRequestManager().logLazyTransactionBegin(
0839: request.getTransactionId());
0840:
0841: // Log request
0842: if (recoveryLog != null) {
0843: boolean mustLog = !request.isReadOnly();
0844: if (taskType != STATEMENT_EXECUTE_TASK) { // faster than (request instanceof StoredProcedure)
0845: DatabaseProcedureSemantic semantic = ((StoredProcedure) request)
0846: .getSemantic();
0847: mustLog = (semantic == null) || semantic.isWrite();
0848: }
0849: if (mustLog)
0850: recoveryLog.logRequestExecuting(request);
0851: }
0852:
0853: int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0854: String.valueOf(request.getId()));
0855:
0856: // Create the task
0857: AbstractTask task;
0858: switch (taskType) {
0859: case EXECUTE_QUERY_TASK:
0860: task = new CallableStatementExecuteQueryTask(
0861: getNbToWait(nbOfThreads), nbOfThreads,
0862: (StoredProcedure) request, metadataCache);
0863: break;
0864: case EXECUTE_UPDATE_TASK:
0865: task = new CallableStatementExecuteUpdateTask(
0866: getNbToWait(nbOfThreads), nbOfThreads,
0867: (StoredProcedure) request);
0868: break;
0869: case CALLABLE_EXECUTE_TASK:
0870: task = new CallableStatementExecuteTask(
0871: getNbToWait(nbOfThreads), nbOfThreads,
0872: (StoredProcedure) request, metadataCache);
0873: break;
0874: case STATEMENT_EXECUTE_TASK:
0875: task = new StatementExecuteTask(getNbToWait(nbOfThreads),
0876: nbOfThreads, (AbstractWriteRequest) request,
0877: metadataCache);
0878: break;
0879: default:
0880: throw new RuntimeException("Unhandled task type "
0881: + taskType + " in callStoredProcedure");
0882: }
0883:
0884: atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
0885: removeFromTotalOrderQueue);
0886:
0887: try {
0888: synchronized (task) {
0889: if (!task.hasCompleted())
0890: waitForTaskCompletion(request.getTimeout() * 1000L,
0891: String.valueOf(request.getId()), task);
0892:
0893: checkTaskCompletion(task);
0894: return task;
0895: }
0896: } finally {
0897: if (!request.isAutoCommit()) { // Check that transaction was not aborted in parallel
0898: try {
0899: this .vdb
0900: .getRequestManager()
0901: .getTransactionMetaData(
0902: new Long(request.getTransactionId()));
0903: } catch (SQLException e) { // Transaction was aborted it cannot be found anymore in the active
0904: // transaction list. Force an abort
0905: logger
0906: .info("Concurrent abort detected, re-inforcing abort of transaction "
0907: + request.getTransactionId());
0908: abort(new TransactionMetaData(request
0909: .getTransactionId(), 0, request.getLogin(),
0910: request.isPersistentConnection(), request
0911: .getPersistentConnectionId()));
0912: }
0913: }
0914: }
0915: }
0916:
0917: /**
0918: * Check the completion status of the task and throws appropriate Exceptions
0919: * if the status of the task was not successful, otherwise do nothing.
0920: *
0921: * @param task the completed AbstractTask
0922: * @throws AllBackendsFailedException if all backends failed to execute the
0923: * request
0924: * @exception NoMoreBackendException if no backends are left to execute the
0925: * request
0926: * @exception SQLException if an error occurs
0927: */
0928: private void checkTaskCompletion(AbstractTask task)
0929: throws NoMoreBackendException, AllBackendsFailedException,
0930: SQLException {
0931: AbstractRequest request = task.getRequest();
0932:
0933: if (task.getSuccess() > 0)
0934: return;
0935:
0936: // Check that someone failed, it might be the case that we only have
0937: // disabling backends left and they have not played this query (thus
0938: // none of them have succeeded or failed).
0939: if (task.getFailed() == 0) {
0940: throw new NoMoreBackendException(Translate
0941: .get("loadbalancer.backendlist.empty"));
0942: }
0943:
0944: if (task.getSuccess() == 0) {
0945: // All backends that executed the query failed
0946: List exceptions = task.getExceptions();
0947: if (exceptions == null)
0948: throw new AllBackendsFailedException(Translate.get(
0949: "loadbalancer.request.failed.all",
0950: new Object[] { request.getType(),
0951: String.valueOf(request.getId()) }));
0952: else {
0953: String errorMsg = Translate.get(
0954: "loadbalancer.request.failed.stack",
0955: new Object[] { request.getType(),
0956: String.valueOf(request.getId()) })
0957: + "\n";
0958: SQLException ex = SQLExceptionFactory.getSQLException(
0959: exceptions, errorMsg);
0960: logger.error(ex.getMessage());
0961: throw ex;
0962: }
0963: }
0964: }
0965:
0966: /**
0967: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
0968: */
0969: public ControllerResultSet getPreparedStatementGetMetaData(
0970: AbstractRequest request) throws SQLException {
0971: // Choose a backend
0972: try {
0973: vdb.acquireReadLockBackendLists();
0974: } catch (InterruptedException e) {
0975: String msg = Translate.get(
0976: "loadbalancer.backendlist.acquire.readlock.failed",
0977: e);
0978: logger.error(msg);
0979: throw new SQLException(msg);
0980: }
0981:
0982: /*
0983: * The backend that will execute the query
0984: */
0985: DatabaseBackend backend = null;
0986:
0987: // Note that vdb lock is released in the finally clause of this try/catch
0988: // block
0989: try {
0990: ArrayList backends = vdb.getBackends();
0991: int size = backends.size();
0992:
0993: if (size == 0)
0994: throw new SQLException(Translate.get(
0995: "loadbalancer.execute.no.backend.available",
0996: request.getId()));
0997:
0998: // Choose the first available backend
0999: for (int i = 0; i < size; i++) {
1000: DatabaseBackend b = (DatabaseBackend) backends.get(i);
1001: if (b.isReadEnabled()) {
1002: backend = b;
1003: break;
1004: }
1005: }
1006: } catch (Throwable e) {
1007: String msg = Translate.get(
1008: "loadbalancer.execute.find.backend.failed",
1009: new String[] {
1010: request.getSqlShortForm(vdb
1011: .getSqlShortFormLength()),
1012: e.getMessage() });
1013: logger.error(msg, e);
1014: throw new SQLException(msg);
1015: } finally {
1016: vdb.releaseReadLockBackendLists();
1017: }
1018:
1019: if (backend == null)
1020: throw new NoMoreBackendException(Translate.get(
1021: "loadbalancer.execute.no.backend.enabled", request
1022: .getId()));
1023:
1024: // Ok, we have a backend, let's execute the request
1025: AbstractConnectionManager cm = backend
1026: .getConnectionManager(request.getLogin());
1027:
1028: // Sanity check
1029: if (cm == null) {
1030: String msg = Translate.get(
1031: "loadbalancer.connectionmanager.not.found",
1032: new String[] { request.getLogin(),
1033: backend.getName() });
1034: logger.error(msg);
1035: throw new SQLException(msg);
1036: }
1037:
1038: // Execute the query
1039: if (request.isAutoCommit()) {
1040: ControllerResultSet rs = null;
1041: boolean badConnection;
1042: do {
1043: badConnection = false;
1044: // Use a connection just for this request
1045: PooledConnection c = null;
1046: try {
1047: c = cm.retrieveConnectionInAutoCommit(request);
1048: } catch (UnreachableBackendException e1) {
1049: String msg = Translate
1050: .get(
1051: "loadbalancer.backend.disabling.unreachable",
1052: backend.getName());
1053: logger.error(msg);
1054: endUserLogger.error(msg);
1055: disableBackend(backend, true);
1056: // Retry on a different backend
1057: return getPreparedStatementGetMetaData(request);
1058: }
1059:
1060: // Sanity check
1061: if (c == null)
1062: throw new SQLException(Translate.get(
1063: "loadbalancer.backend.no.connection",
1064: backend.getName()));
1065:
1066: // Execute Query
1067: try {
1068: rs = preparedStatementGetMetaDataOnBackend(request
1069: .getSqlOrTemplate(), backend, c
1070: .getConnection());
1071: cm.releaseConnectionInAutoCommit(request, c);
1072: } catch (SQLException e) {
1073: cm.releaseConnectionInAutoCommit(request, c);
1074: throw SQLExceptionFactory
1075: .getSQLException(
1076: e,
1077: Translate
1078: .get(
1079: "loadbalancer.request.failed.on.backend",
1080: new String[] {
1081: request
1082: .getSqlShortForm(vdb
1083: .getSqlShortFormLength()),
1084: backend
1085: .getName(),
1086: e
1087: .getMessage() }));
1088: } catch (BadConnectionException e) { // Get rid of the bad connection
1089: cm.deleteConnection(c);
1090: badConnection = true;
1091: } catch (Throwable e) {
1092: cm.releaseConnectionInAutoCommit(request, c);
1093:
1094: logger.error("Unexpected exception:", e);
1095: throw new SQLException(
1096: Translate
1097: .get(
1098: "loadbalancer.request.failed.on.backend",
1099: new String[] {
1100: request
1101: .getSqlShortForm(vdb
1102: .getSqlShortFormLength()),
1103: backend.getName(),
1104: e.getMessage() }));
1105: }
1106: } while (badConnection);
1107: if (logger.isDebugEnabled())
1108: logger.debug(Translate.get("loadbalancer.execute.on",
1109: new String[] { String.valueOf(request.getId()),
1110: backend.getName() }));
1111: return rs;
1112: } else { // Inside a transaction
1113: Connection c;
1114: long tid = request.getTransactionId();
1115:
1116: try {
1117: c = backend
1118: .getConnectionForTransactionAndLazyBeginIfNeeded(
1119: request, cm);
1120: } catch (UnreachableBackendException e1) {
1121: String msg = Translate.get(
1122: "loadbalancer.backend.disabling.unreachable",
1123: backend.getName());
1124: logger.error(msg);
1125: endUserLogger.error(msg);
1126: disableBackend(backend, true);
1127: throw new SQLException(Translate.get(
1128: "loadbalancer.backend.unreacheable", backend
1129: .getName()));
1130: } catch (NoTransactionStartWhenDisablingException e) {
1131: String msg = Translate.get(
1132: "loadbalancer.backend.is.disabling",
1133: new String[] {
1134: request.getSqlShortForm(vdb
1135: .getSqlShortFormLength()),
1136: backend.getName() });
1137: logger.error(msg);
1138: throw new SQLException(msg);
1139: }
1140:
1141: // Sanity check
1142: if (c == null)
1143: throw new SQLException(Translate.get(
1144: "loadbalancer.unable.retrieve.connection",
1145: new String[] { String.valueOf(tid),
1146: backend.getName() }));
1147:
1148: // Execute Query
1149: ControllerResultSet rs = null;
1150: try {
1151: rs = preparedStatementGetMetaDataOnBackend(request
1152: .getSqlOrTemplate(), backend, c);
1153: } catch (SQLException e) {
1154: throw e;
1155: } catch (BadConnectionException e) { // Connection failed, so did the transaction
1156: // Disable the backend.
1157: cm.deleteConnection(tid);
1158: String msg = Translate
1159: .get(
1160: "loadbalancer.backend.disabling.connection.failure",
1161: backend.getName());
1162: logger.error(msg);
1163: endUserLogger.error(msg);
1164: disableBackend(backend, true);
1165: throw new SQLException(msg);
1166: } catch (Throwable e) {
1167:
1168: logger.error("Unexpected exception:", e);
1169: throw new SQLException(Translate.get(
1170: "loadbalancer.request.failed.on.backend",
1171: new String[] {
1172: request.getSqlShortForm(vdb
1173: .getSqlShortFormLength()),
1174: backend.getName(), e.getMessage() }));
1175: }
1176: if (logger.isDebugEnabled())
1177: logger.debug(Translate.get(
1178: "loadbalancer.execute.transaction.on",
1179: new String[] { String.valueOf(tid),
1180: String.valueOf(request.getId()),
1181: backend.getName() }));
1182: return rs;
1183: }
1184: }
1185:
1186: /*
1187: * Transaction management
1188: */
1189:
1190: /**
1191: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1192: */
1193: public void abort(TransactionMetaData tm) throws SQLException {
1194: long tid = tm.getTransactionId();
1195: boolean executeRollback = false;
1196: DistributedRollback toqObject = null;
1197: /*
1198: * Let previous queries be flushed into the load balancer queues so that we
1199: * can abort them and that no queries for that transaction wait in the total
1200: * order queue while we are aborting. Note that the wait and remove from the
1201: * total order queue will be done in the call to rollback at the end of this
1202: * method.
1203: */
1204: if (vdb.getTotalOrderQueue() != null) {
1205: toqObject = new DistributedRollback(tm.getLogin(), tid);
1206: waitForTotalOrder(toqObject, false);
1207: }
1208:
1209: try {
1210: // Acquire the lock
1211: String requestDescription = "abort " + tid;
1212: int nbOfThreads = acquireLockAndCheckNbOfThreads(toqObject,
1213: requestDescription);
1214:
1215: boolean rollbackInProgress = false;
1216: synchronized (enabledBackends) {
1217: // Abort all queries on all backends that have started this transaction
1218: for (int i = 0; i < nbOfThreads; i++) {
1219: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1220: .get(i);
1221: rollbackInProgress = rollbackInProgress
1222: || backend.getTaskQueues()
1223: .abortAllQueriesForTransaction(tid);
1224: }
1225: }
1226:
1227: // Release the lock
1228: backendListLock.releaseRead();
1229:
1230: if (rollbackInProgress) { // already aborting
1231: if (vdb.getTotalOrderQueue() != null)
1232: removeObjectFromAndNotifyTotalOrderQueue(toqObject);
1233: return;
1234: }
1235:
1236: executeRollback = true;
1237: rollback(tm);
1238: } catch (NoMoreBackendException ignore) {
1239: if (!executeRollback && (recoveryLog != null))
1240: recoveryLog.logAbort(tm); // Executing status
1241: }
1242: }
1243:
1244: /**
1245: * Begins a new transaction.
1246: *
1247: * @param tm the transaction marker metadata
1248: * @exception SQLException if an error occurs
1249: */
1250: public final void begin(TransactionMetaData tm) throws SQLException {
1251: }
1252:
1253: /**
1254: * Commits a transaction.
1255: *
1256: * @param tm the transaction marker metadata
1257: * @exception SQLException if an error occurs
1258: */
1259: public void commit(TransactionMetaData tm) throws SQLException {
1260: long tid = tm.getTransactionId();
1261: Long lTid = new Long(tid);
1262:
1263: // Ordering for distributed virtual database
1264: boolean canTakeReadLock = false;
1265: DistributedCommit totalOrderCommit = null;
1266: if (vdb.getTotalOrderQueue() != null) {
1267: // Total ordering mainly for distributed virtual databases.
1268: // If waitForTotalOrder returns true then the query has been scheduled in
1269: // total order and there is no need to take a write lock later to resolve
1270: // potential conflicts.
1271: totalOrderCommit = new DistributedCommit(tm.getLogin(), tid);
1272: canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1273: if (!canTakeReadLock)
1274: // This is a local commit no total order info
1275: totalOrderCommit = null;
1276: }
1277:
1278: // Update the recovery log
1279: if (recoveryLog != null)
1280: recoveryLog.logCommit(tm);
1281:
1282: // Acquire the lock
1283: String requestDescription = "commit " + tid;
1284: int nbOfThreads = acquireLockAndCheckNbOfThreads(
1285: totalOrderCommit, requestDescription);
1286:
1287: // Build the list of backends that need to commit this transaction
1288: ArrayList commitList = new ArrayList(nbOfThreads);
1289: for (int i = 0; i < nbOfThreads; i++) {
1290: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1291: .get(i);
1292: if (backend.isStartedTransaction(lTid))
1293: commitList.add(backend);
1294: }
1295:
1296: int nbOfThreadsToCommit = commitList.size();
1297: CommitTask task = null;
1298: if (nbOfThreadsToCommit != 0)
1299: task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1300: nbOfThreadsToCommit, tm);
1301:
1302: // Post the task in the non-conflicting queues.
1303: synchronized (enabledBackends) {
1304: for (int i = 0; i < nbOfThreadsToCommit; i++) {
1305: DatabaseBackend backend = (DatabaseBackend) commitList
1306: .get(i);
1307: backend.getTaskQueues()
1308: .addTaskToBackendTotalOrderQueue(task);
1309: }
1310: }
1311:
1312: // Release the lock
1313: backendListLock.releaseRead();
1314:
1315: // Unblock next query from total order queue
1316: if (totalOrderCommit != null)
1317: removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1318:
1319: // Check if someone had something to commit
1320: if (task == null)
1321: return;
1322:
1323: synchronized (task) {
1324: if (!task.hasCompleted())
1325: waitForTaskCompletion(tm.getTimeout(),
1326: requestDescription, task);
1327:
1328: if (task.getSuccess() == 0) { // All tasks failed
1329: List exceptions = task.getExceptions();
1330: if (exceptions == null)
1331: throw new SQLException(Translate.get(
1332: "loadbalancer.commit.all.failed", tid));
1333: else {
1334: String errorMsg = Translate.get(
1335: "loadbalancer.commit.failed.stack", tid)
1336: + "\n";
1337: SQLException ex = SQLExceptionFactory
1338: .getSQLException(exceptions, errorMsg);
1339: logger.error(ex.getMessage());
1340: throw ex;
1341: }
1342: }
1343: }
1344: }
1345:
1346: /**
1347: * Rollbacks a transaction.
1348: *
1349: * @param tm the transaction marker metadata
1350: * @exception SQLException if an error occurs
1351: */
1352: public void rollback(TransactionMetaData tm) throws SQLException {
1353: long tid = tm.getTransactionId();
1354: Long lTid = new Long(tid);
1355:
1356: // Ordering for distributed virtual database
1357: DistributedRollback totalOrderRollback = null;
1358: boolean canTakeReadLock = false;
1359: if (vdb.getTotalOrderQueue() != null) {
1360: totalOrderRollback = new DistributedRollback(tm.getLogin(),
1361: tid);
1362: // Total ordering mainly for distributed virtual databases.
1363: // If waitForTotalOrder returns true then the query has been scheduled in
1364: // total order and there is no need to take a write lock later to resolve
1365: // potential conflicts.
1366: canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1367: false);
1368: if (!canTakeReadLock)
1369: // This is a local rollback no total order info
1370: totalOrderRollback = null;
1371: }
1372:
1373: // Update the recovery log
1374: if (recoveryLog != null)
1375: recoveryLog.logRollback(tm);
1376:
1377: // Acquire the lock
1378: String requestDescription = "rollback " + tid;
1379: int nbOfThreads = acquireLockAndCheckNbOfThreads(
1380: totalOrderRollback, requestDescription);
1381:
1382: // Build the list of backends that need to rollback this transaction
1383: ArrayList rollbackList = new ArrayList();
1384: for (int i = 0; i < nbOfThreads; i++) {
1385: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1386: .get(i);
1387: if (backend.isStartedTransaction(lTid))
1388: rollbackList.add(backend);
1389: }
1390:
1391: int nbOfThreadsToRollback = rollbackList.size();
1392: RollbackTask task = null;
1393: task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1394: nbOfThreadsToRollback, tm);
1395:
1396: // Post the task in the non-conflicting queues.
1397: synchronized (enabledBackends) {
1398: for (int i = 0; i < nbOfThreadsToRollback; i++) {
1399: DatabaseBackend backend = (DatabaseBackend) rollbackList
1400: .get(i);
1401: backend.getTaskQueues()
1402: .addTaskToBackendTotalOrderQueue(task);
1403: }
1404: }
1405:
1406: // Release the lock
1407: backendListLock.releaseRead();
1408:
1409: // Unblock next query from total order queue
1410: if (totalOrderRollback != null)
1411: removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1412:
1413: // Check if someone had something to rollback
1414: if (nbOfThreadsToRollback == 0)
1415: return;
1416:
1417: synchronized (task) {
1418: if (!task.hasCompleted())
1419: waitForTaskCompletion(tm.getTimeout(),
1420: requestDescription, task);
1421:
1422: if (task.getSuccess() > 0)
1423: return;
1424:
1425: // All tasks failed
1426: List exceptions = task.getExceptions();
1427: if (exceptions == null)
1428: throw new SQLException(Translate.get(
1429: "loadbalancer.rollback.all.failed", tid));
1430: else {
1431: String errorMsg = Translate.get(
1432: "loadbalancer.rollback.failed.stack", tid)
1433: + "\n";
1434: SQLException ex = SQLExceptionFactory.getSQLException(
1435: exceptions, errorMsg);
1436: logger.error(ex.getMessage());
1437: throw ex;
1438: }
1439: }
1440: }
1441:
1442: /**
1443: * Rollback a transaction to a savepoint
1444: *
1445: * @param tm The transaction marker metadata
1446: * @param savepointName The name of the savepoint
1447: * @throws SQLException if an error occurs
1448: */
1449: public void rollbackToSavepoint(TransactionMetaData tm,
1450: String savepointName) throws SQLException {
1451: long tid = tm.getTransactionId();
1452: Long lTid = new Long(tid);
1453:
1454: // Ordering for distributed virtual database
1455: DistributedRollbackToSavepoint totalOrderRollback = null;
1456: boolean canTakeReadLock = false;
1457: if (vdb.getTotalOrderQueue() != null) {
1458: totalOrderRollback = new DistributedRollbackToSavepoint(
1459: tid, savepointName);
1460: // Total ordering mainly for distributed virtual databases.
1461: // If waitForTotalOrder returns true then the query has been scheduled in
1462: // total order and there is no need to take a write lock later to resolve
1463: // potential conflicts.
1464: canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1465: false);
1466: if (!canTakeReadLock)
1467: // This is a local commit no total order info
1468: totalOrderRollback = null;
1469: }
1470:
1471: // Update the recovery log
1472: if (recoveryLog != null)
1473: recoveryLog.logRollbackToSavepoint(tm, savepointName);
1474:
1475: // Acquire the lock
1476: String requestDescription = "rollback " + savepointName + " "
1477: + tid;
1478: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1479: requestDescription);
1480:
1481: // Build the list of backends that need to rollback this transaction
1482: ArrayList rollbackList = new ArrayList();
1483: for (int i = 0; i < nbOfThreads; i++) {
1484: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1485: .get(i);
1486: if (backend.isStartedTransaction(lTid))
1487: rollbackList.add(backend);
1488: }
1489:
1490: int nbOfThreadsToRollback = rollbackList.size();
1491: RollbackToSavepointTask task = null;
1492: if (nbOfThreadsToRollback != 0)
1493: task = new RollbackToSavepointTask(
1494: getNbToWait(nbOfThreadsToRollback),
1495: nbOfThreadsToRollback, tm, savepointName);
1496:
1497: // Post the task in the non-conflicting queues.
1498: synchronized (enabledBackends) {
1499: for (int i = 0; i < nbOfThreadsToRollback; i++) {
1500: DatabaseBackend backend = (DatabaseBackend) rollbackList
1501: .get(i);
1502: backend.getTaskQueues()
1503: .addTaskToBackendTotalOrderQueue(task);
1504: }
1505: }
1506:
1507: // Release the lock
1508: backendListLock.releaseRead();
1509:
1510: // Unblock next query from total order queue
1511: if (totalOrderRollback != null)
1512: removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1513:
1514: // Check if someone had something to rollback
1515: if (task == null)
1516: return;
1517:
1518: synchronized (task) {
1519: if (!task.hasCompleted())
1520: waitForTaskCompletion(tm.getTimeout(),
1521: requestDescription, task);
1522:
1523: if (task.getSuccess() == 0) { // All tasks failed
1524: List exceptions = task.getExceptions();
1525: if (exceptions == null)
1526: throw new SQLException(
1527: Translate
1528: .get(
1529: "loadbalancer.rollbacksavepoint.all.failed",
1530: new String[] {
1531: savepointName,
1532: String.valueOf(tid) }));
1533: else {
1534: String errorMsg = Translate
1535: .get(
1536: "loadbalancer.rollbacksavepoint.failed.stack",
1537: new String[] { savepointName,
1538: String.valueOf(tid) })
1539: + "\n";
1540: SQLException ex = SQLExceptionFactory
1541: .getSQLException(exceptions, errorMsg);
1542: logger.error(ex.getMessage());
1543: throw ex;
1544: }
1545: }
1546: }
1547: }
1548:
1549: /**
1550: * Release a savepoint from a transaction
1551: *
1552: * @param tm The transaction marker metadata
1553: * @param savepointName The name of the savepoint ro release
1554: * @throws SQLException if an error occurs
1555: */
1556: public void releaseSavepoint(TransactionMetaData tm,
1557: String savepointName) throws SQLException {
1558: long tid = tm.getTransactionId();
1559: Long lTid = new Long(tid);
1560:
1561: // Ordering for distributed virtual database
1562: DistributedReleaseSavepoint totalOrderRelease = null;
1563: boolean canTakeReadLock = false;
1564: if (vdb.getTotalOrderQueue() != null) {
1565: totalOrderRelease = new DistributedReleaseSavepoint(tid,
1566: savepointName);
1567: // Total ordering mainly for distributed virtual databases.
1568: // If waitForTotalOrder returns true then the query has been scheduled in
1569: // total order and there is no need to take a write lock later to resolve
1570: // potential conflicts.
1571: canTakeReadLock = waitForTotalOrder(totalOrderRelease,
1572: false);
1573: if (!canTakeReadLock)
1574: // This is a local commit no total order info
1575: totalOrderRelease = null;
1576: }
1577:
1578: // Update the recovery log
1579: if (recoveryLog != null)
1580: recoveryLog.logReleaseSavepoint(tm, savepointName);
1581:
1582: // Acquire the lock
1583: String requestDescription = "release savepoint "
1584: + savepointName + " " + tid;
1585: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1586: requestDescription);
1587:
1588: // Build the list of backends that need to rollback this transaction
1589: ArrayList savepointList = new ArrayList();
1590: for (int i = 0; i < nbOfThreads; i++) {
1591: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1592: .get(i);
1593: if (backend.isStartedTransaction(lTid))
1594: savepointList.add(backend);
1595: }
1596:
1597: int nbOfSavepoints = savepointList.size();
1598: ReleaseSavepointTask task = null;
1599: if (nbOfSavepoints != 0)
1600: task = new ReleaseSavepointTask(getNbToWait(nbOfThreads),
1601: nbOfThreads, tm, savepointName);
1602:
1603: // Post the task in the non-conflicting queues.
1604: synchronized (enabledBackends) {
1605: for (int i = 0; i < nbOfSavepoints; i++) {
1606: DatabaseBackend backend = (DatabaseBackend) savepointList
1607: .get(i);
1608: backend.getTaskQueues()
1609: .addTaskToBackendTotalOrderQueue(task);
1610: }
1611: }
1612:
1613: // Release the lock
1614: backendListLock.releaseRead();
1615:
1616: // Unblock next query from total order queue
1617: if (totalOrderRelease != null)
1618: removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease);
1619:
1620: // Check if someone had something to release
1621: if (task == null)
1622: return;
1623:
1624: synchronized (task) {
1625: if (!task.hasCompleted())
1626: waitForTaskCompletion(tm.getTimeout(),
1627: requestDescription, task);
1628:
1629: if (task.getSuccess() == 0) { // All tasks failed
1630: List exceptions = task.getExceptions();
1631: if (exceptions == null)
1632: throw new SQLException(Translate.get(
1633: "loadbalancer.releasesavepoint.all.failed",
1634: new String[] { savepointName,
1635: String.valueOf(tid) }));
1636: else {
1637: String errorMsg = Translate
1638: .get(
1639: "loadbalancer.releasesavepoint.failed.stack",
1640: new String[] { savepointName,
1641: String.valueOf(tid) })
1642: + "\n";
1643: SQLException ex = SQLExceptionFactory
1644: .getSQLException(exceptions, errorMsg);
1645: logger.error(ex.getMessage());
1646: throw ex;
1647: }
1648: }
1649: }
1650: }
1651:
1652: /**
1653: * Set a savepoint to a transaction.
1654: *
1655: * @param tm The transaction marker metadata
1656: * @param savepointName The name of the new savepoint
1657: * @throws SQLException if an error occurs
1658: */
1659: public void setSavepoint(TransactionMetaData tm,
1660: String savepointName) throws SQLException {
1661: long tid = tm.getTransactionId();
1662:
1663: // Ordering for distributed virtual database
1664: DistributedSetSavepoint totalOrderSavepoint = null;
1665: boolean canTakeReadLock = false;
1666: if (vdb.getTotalOrderQueue() != null) {
1667: totalOrderSavepoint = new DistributedSetSavepoint(tm
1668: .getLogin(), tid, savepointName);
1669: // Total ordering mainly for distributed virtual databases.
1670: // If waitForTotalOrder returns true then the query has been scheduled in
1671: // total order and there is no need to take a write lock later to resolve
1672: // potential conflicts.
1673: canTakeReadLock = waitForTotalOrder(totalOrderSavepoint,
1674: false);
1675: if (!canTakeReadLock)
1676: // This is a local commit no total order info
1677: totalOrderSavepoint = null;
1678: }
1679:
1680: // Update the recovery log
1681: if (recoveryLog != null)
1682: recoveryLog.logSetSavepoint(tm, savepointName);
1683:
1684: // Acquire the lock
1685: String requestDescription = "set savepoint " + savepointName
1686: + " " + tid;
1687: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1688: requestDescription);
1689:
1690: SavepointTask task = null;
1691:
1692: // Post the task in the non-conflicting queues of all backends.
1693: synchronized (enabledBackends) {
1694: if (nbOfThreads != 0) {
1695: task = new SavepointTask(getNbToWait(nbOfThreads),
1696: nbOfThreads, tm, savepointName);
1697: for (int i = 0; i < nbOfThreads; i++) {
1698: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1699: .get(i);
1700: backend.getTaskQueues()
1701: .addTaskToBackendTotalOrderQueue(task);
1702: }
1703: }
1704: }
1705:
1706: // Release the lock
1707: backendListLock.releaseRead();
1708:
1709: // Unblock next query from total order queue
1710: if (totalOrderSavepoint != null)
1711: removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint);
1712:
1713: // Check if someone had something to release
1714: if (task == null)
1715: return;
1716:
1717: synchronized (task) {
1718: if (!task.hasCompleted())
1719: waitForTaskCompletion(tm.getTimeout(),
1720: requestDescription, task);
1721:
1722: if (task.getSuccess() == 0) { // All tasks failed
1723: List exceptions = task.getExceptions();
1724: if (exceptions == null)
1725: throw new SQLException(Translate.get(
1726: "loadbalancer.setsavepoint.all.failed",
1727: new String[] { savepointName,
1728: String.valueOf(tid) }));
1729: else {
1730: String errorMsg = Translate.get(
1731: "loadbalancer.setsavepoint.failed.stack",
1732: new String[] { savepointName,
1733: String.valueOf(tid) })
1734: + "\n";
1735: SQLException ex = SQLExceptionFactory
1736: .getSQLException(exceptions, errorMsg);
1737: logger.error(ex.getMessage());
1738: throw ex;
1739: }
1740: }
1741: }
1742: }
1743:
1744: //
1745: // Utility functions
1746: //
1747:
1748: /**
1749: * Check in which queue the task should be posted and atomically posts the
1750: * task in the queue of all backends. The list of locks acquired by the
1751: * request is set on the task if the request is in autocommit mode (if it is
1752: * in a transaction it is automatically added to the transaction lock list).
1753: * The list of lock can be null if no lock has been acquired.
1754: *
1755: * @param task the task to post
1756: * @param nbOfThreads number of threads in the backend list (must already be
1757: * locked)
1758: * @param removeFromTotalOrderQueue true if the query must be removed from the
1759: * total order queue
1760: */
1761: private void atomicTaskPostInQueueAndReleaseLock(
1762: AbstractRequest request, AbstractTask task,
1763: int nbOfThreads, boolean removeFromTotalOrderQueue) {
1764: synchronized (enabledBackends) {
1765: for (int i = 0; i < nbOfThreads; i++) {
1766: BackendTaskQueues queues = ((DatabaseBackend) enabledBackends
1767: .get(i)).getTaskQueues();
1768: queues.addTaskToBackendTotalOrderQueue(task);
1769: }
1770: }
1771:
1772: backendListLock.releaseRead();
1773:
1774: // Unblock next query from total order queue
1775: if (removeFromTotalOrderQueue) {
1776: removeObjectFromAndNotifyTotalOrderQueue(request);
1777: }
1778: }
1779:
1780: /**
1781: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1782: * long)
1783: */
1784: public void closePersistentConnection(String login,
1785: long persistentConnectionId) throws SQLException {
1786: /*
1787: * We assume a synchronous execution and connection closing can only come
1788: * after all requests have been executed in that connection. We post to all
1789: * backends and let the task deal with whether that backend had a persistent
1790: * connection or not.
1791: */
1792:
1793: String requestDescription = "closing persistent connection "
1794: + persistentConnectionId;
1795: int nbOfThreads = 0;
1796:
1797: DistributedClosePersistentConnection totalOrderQueueObject = null;
1798: boolean removefromTotalOrder = false;
1799: if (vdb.getTotalOrderQueue() != null) {
1800: totalOrderQueueObject = new DistributedClosePersistentConnection(
1801: login, persistentConnectionId);
1802: removefromTotalOrder = waitForTotalOrder(
1803: totalOrderQueueObject, false);
1804: }
1805:
1806: ClosePersistentConnectionTask task = null;
1807: try {
1808: nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1809: requestDescription);
1810:
1811: task = new ClosePersistentConnectionTask(
1812: getNbToWait(nbOfThreads), nbOfThreads, login,
1813: persistentConnectionId);
1814:
1815: // Post the task in the non-conflicting queues.
1816: synchronized (enabledBackends) {
1817: for (int i = 0; i < nbOfThreads; i++) {
1818: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1819: .get(i);
1820: backend.getTaskQueues()
1821: .addTaskToBackendTotalOrderQueue(task);
1822: }
1823: }
1824:
1825: // Release the lock
1826: backendListLock.releaseRead();
1827:
1828: if (removefromTotalOrder)
1829: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1830: totalOrderQueueObject = null;
1831:
1832: synchronized (task) {
1833: if (!task.hasCompleted())
1834: try {
1835: waitForTaskCompletion(0, requestDescription,
1836: task);
1837: } catch (SQLException ignore) {
1838: }
1839: }
1840: } finally {
1841: if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1842: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1843: }
1844:
1845: if (logger.isDebugEnabled())
1846: logger.debug(requestDescription + " completed on "
1847: + nbOfThreads + " backends.");
1848: }
1849: }
1850:
1851: /**
1852: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1853: * long)
1854: */
1855: public void openPersistentConnection(String login,
1856: long persistentConnectionId) throws SQLException {
1857: String requestDescription = "opening persistent connection "
1858: + persistentConnectionId;
1859: int nbOfThreads = 0;
1860:
1861: DistributedOpenPersistentConnection totalOrderQueueObject = null;
1862: if (vdb.getTotalOrderQueue() != null) {
1863: totalOrderQueueObject = new DistributedOpenPersistentConnection(
1864: login, persistentConnectionId);
1865: waitForTotalOrder(totalOrderQueueObject, true);
1866: }
1867:
1868: OpenPersistentConnectionTask task = null;
1869: try {
1870: nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1871: requestDescription);
1872:
1873: task = new OpenPersistentConnectionTask(
1874: getNbToWait(nbOfThreads), nbOfThreads, login,
1875: persistentConnectionId);
1876:
1877: // Post the task in the non-conflicting queues.
1878: synchronized (enabledBackends) {
1879: for (int i = 0; i < nbOfThreads; i++) {
1880: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1881: .get(i);
1882: backend.getTaskQueues()
1883: .addTaskToBackendTotalOrderQueue(task);
1884: }
1885: }
1886:
1887: // Release the lock
1888: backendListLock.releaseRead();
1889:
1890: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1891: totalOrderQueueObject = null;
1892:
1893: synchronized (task) {
1894: if (!task.hasCompleted())
1895: try {
1896: waitForTaskCompletion(0, requestDescription,
1897: task);
1898: } catch (SQLException ignore) {
1899: }
1900: }
1901: } finally {
1902: if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1903: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1904: }
1905:
1906: if (logger.isDebugEnabled())
1907: logger.debug(requestDescription + " completed on "
1908: + nbOfThreads + " backends.");
1909: }
1910: }
1911:
1912: /**
1913: * Enables a Backend that was previously disabled.
1914: * <p>
1915: * Ask the corresponding connection manager to initialize the connections if
1916: * needed.
1917: * <p>
1918: * No sanity checks are performed by this function.
1919: *
1920: * @param db the database backend to enable
1921: * @param writeEnabled True if the backend must be enabled for writes
1922: * @throws SQLException if an error occurs
1923: */
1924: public synchronized void enableBackend(DatabaseBackend db,
1925: boolean writeEnabled) throws SQLException {
1926: if (!db.isInitialized())
1927: db.initializeConnections();
1928:
1929: if (writeEnabled && db.isWriteCanBeEnabled()) {
1930: BackendTaskQueues taskqueues = new BackendTaskQueues(db,
1931: waitForCompletionPolicy, this .vdb
1932: .getRequestManager());
1933: // Create the new backend task queues
1934: try {
1935: ObjectName taskQueuesObjectName = JmxConstants
1936: .getBackendTaskQueuesObjectName(db
1937: .getVirtualDatabaseName(), db.getName());
1938: if (MBeanServerManager.getInstance().isRegistered(
1939: taskQueuesObjectName)) {
1940: MBeanServerManager.unregister(taskQueuesObjectName);
1941: }
1942: MBeanServerManager.registerMBean(
1943: new BackendTaskQueuesControl(taskqueues),
1944: taskQueuesObjectName);
1945: } catch (Exception e) {
1946: if (logger.isWarnEnabled()) {
1947: logger.warn(
1948: "failed to register task queue mbeans for "
1949: + db, e);
1950: }
1951: }
1952: db.setTaskQueues(taskqueues);
1953: db.startWorkerThreads(this );
1954: db.startDeadlockDetectionThread(this .vdb);
1955: db.enableWrite();
1956: }
1957:
1958: db.enableRead();
1959: try {
1960: backendListLock.acquireWrite();
1961: } catch (InterruptedException e) {
1962: logger
1963: .error(
1964: "Error while acquiring write lock in enableBackend",
1965: e);
1966: }
1967:
1968: synchronized (enabledBackends) {
1969: enabledBackends.add(db);
1970: }
1971:
1972: backendListLock.releaseWrite();
1973: }
1974:
1975: /**
1976: * Disables a backend that was previously enabled.
1977: * <p>
1978: * Ask the corresponding connection manager to finalize the connections if
1979: * needed.
1980: * <p>
1981: * No sanity checks are performed by this function.
1982: *
1983: * @param db the database backend to disable
1984: * @param forceDisable true if disabling must be forced on the backend
1985: * @throws SQLException if an error occurs
1986: */
1987: public void disableBackend(DatabaseBackend db, boolean forceDisable)
1988: throws SQLException {
1989: if (!db.disable()) {
1990: // Another thread has already started the disable process
1991: return;
1992: }
1993: synchronized (this ) {
1994: try {
1995: backendListLock.acquireWrite();
1996: } catch (InterruptedException e) {
1997: logger
1998: .error(
1999: "Error while acquiring write lock in enableBackend",
2000: e);
2001: }
2002:
2003: try {
2004: synchronized (enabledBackends) {
2005: enabledBackends.remove(db);
2006: if (enabledBackends.isEmpty()) {
2007: // Cleanup schema for any remaining locks
2008: this .vdb.getRequestManager().setDatabaseSchema(
2009: null, false);
2010: }
2011: }
2012:
2013: if (!forceDisable)
2014: terminateThreadsAndConnections(db);
2015: } finally {
2016: backendListLock.releaseWrite();
2017: }
2018:
2019: if (forceDisable) {
2020: db.shutdownConnectionManagers();
2021: terminateThreadsAndConnections(db, false);
2022: }
2023:
2024: // sanity check on backend's active transaction
2025: if (!db.getActiveTransactions().isEmpty()) {
2026: if (logger.isWarnEnabled()) {
2027: logger.warn("Active transactions after backend "
2028: + db.getName() + " is disabled: "
2029: + db.getActiveTransactions());
2030: }
2031: }
2032: }
2033: }
2034:
2035: private void terminateThreadsAndConnections(DatabaseBackend db)
2036: throws SQLException {
2037: terminateThreadsAndConnections(db, true);
2038: }
2039:
2040: private void terminateThreadsAndConnections(DatabaseBackend db,
2041: boolean wait) throws SQLException {
2042: db.terminateWorkerThreads(wait);
2043: db.terminateDeadlockDetectionThread();
2044:
2045: if (db.isInitialized())
2046: db.finalizeConnections();
2047: }
2048:
2049: //
2050: // Debug/Monitoring
2051: //
2052:
2053: /**
2054: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2055: */
2056: public String getXmlImpl() {
2057: StringBuffer info = new StringBuffer();
2058: info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2059: if (waitForCompletionPolicy != null)
2060: info.append(waitForCompletionPolicy.getXml());
2061: if (macroHandler != null)
2062: info.append(macroHandler.getXml());
2063: info.append(getRaidb1Xml());
2064: info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2065: return info.toString();
2066: }
2067:
2068: /**
2069: * Surrounding raidb1 tags can be treated by <method>getXmlImpl </method>
2070: * above, but more detailed content have to be returned by the method
2071: * <method>getRaidb1Xml </method> below.
2072: *
2073: * @return content of Raidb1 xml
2074: */
2075: public abstract String getRaidb1Xml();
2076: }
|