0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Contact: sequoia@continuent.org
0007: *
0008: * Licensed under the Apache License, Version 2.0 (the "License");
0009: * you may not use this file except in compliance with the License.
0010: * You may obtain a copy of the License at
0011: *
0012: * http://www.apache.org/licenses/LICENSE-2.0
0013: *
0014: * Unless required by applicable law or agreed to in writing, software
0015: * distributed under the License is distributed on an "AS IS" BASIS,
0016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017: * See the License for the specific language governing permissions and
0018: * limitations under the License.
0019: *
0020: * Initial developer(s): Emmanuel Cecchet.
0021: * Contributor(s): Jean-Bernard van Zuylen.
0022: */package org.continuent.sequoia.controller.loadbalancer.raidb2;
0023:
0024: import java.sql.Connection;
0025: import java.sql.SQLException;
0026: import java.util.ArrayList;
0027: import java.util.List;
0028:
0029: import org.continuent.sequoia.common.exceptions.BadConnectionException;
0030: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0031: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0032: import org.continuent.sequoia.common.exceptions.NotImplementedException;
0033: import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0034: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0035: import org.continuent.sequoia.common.i18n.Translate;
0036: import org.continuent.sequoia.common.log.Trace;
0037: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0038: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0039: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0040: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0041: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0042: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0043: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0044: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0045: import org.continuent.sequoia.controller.connection.PooledConnection;
0046: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0047: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0048: import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0049: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0050: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException;
0051: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
0052: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule;
0053: import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0054: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask;
0055: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
0056: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask;
0057: import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0058: import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0059: import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0060: import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0061: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0062: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0063: import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0064: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
0065: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask;
0066: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0067: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0068: import org.continuent.sequoia.controller.requests.AbstractRequest;
0069: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0070: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0071: import org.continuent.sequoia.controller.requests.SelectRequest;
0072: import org.continuent.sequoia.controller.requests.StoredProcedure;
0073: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0074: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
0075: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0076: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0077: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0078: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0079: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0080: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0081:
0082: /**
0083: * RAIDb-2 load balancer.
0084: * <p>
0085: * This class is an abstract call because the read requests coming from the
0086: * Request Manager are NOT treated here but in the subclasses. Transaction
0087: * management and write requests are broadcasted to all backends owning the
0088: * written table.
0089: *
0090: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0091: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0092: * </a>
0093: * @version 1.0
0094: */
0095: public abstract class RAIDb2 extends AbstractLoadBalancer {
0096: //
0097: // How the code is organized ?
0098: // 1. Member variables
0099: // 2. Constructor(s)
0100: // 3. Request handling
0101: // 4. Transaction handling
0102: // 5. Backend management
0103: //
0104:
0105: protected CreateTablePolicy createTablePolicy;
0106: protected static Trace logger = Trace
0107: .getLogger("org.continuent.sequoia.controller.loadbalancer.raidb2");
0108:
0109: /*
0110: * Constructors
0111: */
0112:
0113: /**
0114: * Creates a new RAIDb-2 request load balancer. A new backend worker thread is
0115: * created for each backend.
0116: *
0117: * @param vdb the virtual database this load balancer belongs to.
0118: * @param waitForCompletionPolicy how many backends must complete before
0119: * returning the result ?
0120: * @param createTablePolicy the policy defining how 'create table' statements
0121: * should be handled
0122: * @exception Exception if an error occurs
0123: */
0124: public RAIDb2(VirtualDatabase vdb,
0125: WaitForCompletionPolicy waitForCompletionPolicy,
0126: CreateTablePolicy createTablePolicy) throws Exception {
0127: super (vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
0128:
0129: this .waitForCompletionPolicy = waitForCompletionPolicy;
0130: this .createTablePolicy = createTablePolicy;
0131: }
0132:
0133: /*
0134: * Request Handling
0135: */
0136:
0137: /**
0138: * Implementation specific load balanced read execution.
0139: *
0140: * @param request an <code>SelectRequest</code>
0141: * @param metadataCache the metadataCache if any or null
0142: * @return the corresponding <code>java.sql.ResultSet</code>
0143: * @exception SQLException if an error occurs
0144: */
0145: public abstract ControllerResultSet statementExecuteQuery(
0146: SelectRequest request, MetadataCache metadataCache)
0147: throws SQLException;
0148:
0149: /**
0150: * Performs a write request. This request is broadcasted to all nodes that
0151: * owns the table to be written.
0152: *
0153: * @param request an <code>AbstractWriteRequest</code>
0154: * @return number of rows affected by the request
0155: * @throws AllBackendsFailedException if all backends failed to execute the
0156: * request
0157: * @exception SQLException if an error occurs
0158: */
0159: public ExecuteUpdateResult statementExecuteUpdate(
0160: AbstractWriteRequest request)
0161: throws AllBackendsFailedException, SQLException {
0162: return ((StatementExecuteUpdateTask) execWriteRequest(request,
0163: false, null)).getResult();
0164: }
0165:
0166: /**
0167: * Perform a write request and return the auto generated keys.
0168: *
0169: * @param request the request to execute
0170: * @param metadataCache the metadataCache if any or null
0171: * @return update count and auto generated keys.
0172: * @throws AllBackendsFailedException if all backends failed to execute the
0173: * request
0174: * @exception SQLException if an error occurs
0175: */
0176: public GeneratedKeysResult statementExecuteUpdateWithKeys(
0177: AbstractWriteRequest request, MetadataCache metadataCache)
0178: throws AllBackendsFailedException, SQLException {
0179: return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(
0180: request, true, metadataCache)).getResult();
0181: }
0182:
0183: /**
0184: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0185: * MetadataCache)
0186: */
0187: public ExecuteResult statementExecute(AbstractRequest request,
0188: MetadataCache metadataCache) throws SQLException,
0189: AllBackendsFailedException {
0190: throw new NotImplementedException(
0191: "Statement.execute() is currently not supported with RAIDb-2");
0192: }
0193:
0194: /**
0195: * Common code for execWriteRequest(AbstractWriteRequest) and
0196: * execWriteRequestWithKeys(AbstractWriteRequest).
0197: * <p>
0198: * Note that macros are processed here.
0199: * <p>
0200: * The result is given back in AbstractTask.getResult().
0201: *
0202: * @param request the request to execute
0203: * @param useKeys true if this must give an auto generated keys ResultSet
0204: * @param metadataCache the metadataCache if any or null
0205: * @throws AllBackendsFailedException if all backends failed to execute the
0206: * request
0207: * @throws SQLException if an error occurs
0208: */
0209: private AbstractTask execWriteRequest(AbstractWriteRequest request,
0210: boolean useKeys, MetadataCache metadataCache)
0211: throws AllBackendsFailedException, SQLException {
0212: // Handle macros
0213: handleMacros(request);
0214:
0215: // Total ordering mainly for distributed virtual databases.
0216: boolean removeFromTotalOrderQueue = waitForTotalOrder(request,
0217: true);
0218:
0219: // Log lazy begin if needed
0220: if (request.isLazyTransactionStart())
0221: this .vdb.getRequestManager().logLazyTransactionBegin(
0222: request.getTransactionId());
0223:
0224: // Log request
0225: recoveryLog.logRequestExecuting(request);
0226:
0227: int nbOfThreads = acquireLockAndCheckNbOfThreads(request,
0228: String.valueOf(request.getId()));
0229:
0230: List writeList = new ArrayList();
0231:
0232: if (request.isCreate()) {
0233: try {
0234: writeList = getBackendsForCreateTableRequest(request
0235: .getTableName());
0236: } catch (CreateTableException e) {
0237: releaseLockAndUnlockNextQuery(request);
0238: throw new SQLException(Translate.get(
0239: "loadbalancer.create.table.rule.failed", e
0240: .getMessage()));
0241: }
0242: } else {
0243: writeList = getBackendsWithTable(request.getTableName(),
0244: nbOfThreads);
0245: }
0246:
0247: nbOfThreads = writeList.size();
0248: if (nbOfThreads == 0) {
0249: String msg = Translate.get(
0250: "loadbalancer.execute.no.backend.found", request
0251: .getSqlShortForm(vdb
0252: .getSqlShortFormLength()));
0253: logger.warn(msg);
0254:
0255: releaseLockAndUnlockNextQuery(request);
0256: throw new SQLException(msg);
0257: }
0258: if (logger.isDebugEnabled()) {
0259: logger.debug(Translate.get(
0260: "loadbalancer.execute.on.several", String
0261: .valueOf(request.getId()), String
0262: .valueOf(nbOfThreads)));
0263: }
0264:
0265: // Create the task
0266: AbstractTask task;
0267: if (useKeys)
0268: task = new StatementExecuteUpdateWithKeysTask(
0269: getNbToWait(nbOfThreads), nbOfThreads, request,
0270: metadataCache);
0271: else
0272: task = new StatementExecuteUpdateTask(
0273: getNbToWait(nbOfThreads), nbOfThreads, request);
0274:
0275: atomicTaskPostInQueueAndReleaseLock(request, task, writeList,
0276: removeFromTotalOrderQueue);
0277:
0278: synchronized (task) {
0279: if (!task.hasCompleted())
0280: waitForTaskCompletion(request.getTimeout() * 1000L,
0281: String.valueOf(request.getId()), task);
0282:
0283: checkTaskCompletion(task);
0284: return task;
0285: }
0286: }
0287:
0288: /**
0289: * Gets a <code><DatabaseBacken>List</code> of all backends containing
0290: * the table identified by <code>tableName</code>.
0291: *
0292: * @param tableName name of the table
0293: * @param nbOfThreads number of threads
0294: * @return a <code><DatabaseBacken>List</code> of all backends
0295: * containing the table identified by <code>tableName</code>
0296: */
0297: // FIXME why don't we just iterate on the enabledBackends list (assuming we
0298: // acquire/release the read lock)?
0299: // TODO should be moved to VirtualDatabase
0300: private List getBackendsWithTable(String tableName, int nbOfThreads) {
0301: List backendsWithTable = new ArrayList();
0302: for (int i = 0; i < nbOfThreads; i++) {
0303: DatabaseBackend b = (DatabaseBackend) enabledBackends
0304: .get(i);
0305: if (b.hasTable(tableName))
0306: backendsWithTable.add(b);
0307: }
0308: return backendsWithTable;
0309: }
0310:
0311: /**
0312: * Gets a <code><DatabaseBacken>List</code> of all backends with a
0313: * transaction identified by <code>tid</code> already started.
0314: *
0315: * @param tid Transaction identifier
0316: * @param nbOfThreads number of threads
0317: * @return a <code><DatabaseBacken>List</code> of all backends with a
0318: * transaction identified by <code>tid</code> already started
0319: */
0320: // FIXME why don't we just iterate on the enabledBackends list (assuming we
0321: // acquire/release the read lock)?
0322: // TODO should be moved to VirtualDatabase
0323: private List getBackendsWithStartedTransaction(Long tid,
0324: int nbOfThreads) {
0325: // Build the list of backends that need to commit this transaction
0326: List backendsWithStartedTransaction = new ArrayList(nbOfThreads);
0327: for (int i = 0; i < nbOfThreads; i++) {
0328: DatabaseBackend backend = (DatabaseBackend) enabledBackends
0329: .get(i);
0330: if (backend.isStartedTransaction(tid))
0331: backendsWithStartedTransaction.add(backend);
0332: }
0333: return backendsWithStartedTransaction;
0334: }
0335:
0336: /**
0337: * Gets a <code><DatabaseBacken>List</code> of all backends containing
0338: * the stored procedure identified by <code>procedureName</code>.
0339: *
0340: * @param procedureName name of the stored procedure
0341: * @param nbOfParameters number of parameters of the stored procedure
0342: * @param nbOfThreads number of threads
0343: * @return a <code><DatabaseBacken>List</code> of all backends
0344: * containing the table identified by <code>tableName</code>
0345: */
0346: // FIXME why don't we just iterate on the enabledBackends list (assuming we
0347: // acquire/release the read lock)?
0348: // TODO should be moved to VirtualDatabase
0349: private List getBackendsWithStoredProcedure(String procedureName,
0350: int nbOfParameters, int nbOfThreads) {
0351: List backendsWithStoredProcedure = new ArrayList(nbOfThreads);
0352: for (int i = 0; i < nbOfThreads; i++) {
0353: DatabaseBackend b = (DatabaseBackend) enabledBackends
0354: .get(i);
0355: if (b.hasStoredProcedure(procedureName, nbOfParameters))
0356: backendsWithStoredProcedure.add(b);
0357: }
0358: return backendsWithStoredProcedure;
0359: }
0360:
0361: /**
0362: * Gets a <code><DatabaseBacken>List</code> of all backends according
0363: * to the create table policy.
0364: *
0365: * @param tableName the name of the table to create
0366: * @return a <code><DatabaseBacken>List</code> of all backends
0367: * according to the create table policy.
0368: * @throws CreateTableException if an error occurs while getting the backends
0369: * according to the create table policy
0370: */
0371: // TODO should be moved to VirtualDatabase
0372: private List getBackendsForCreateTableRequest(String tableName)
0373: throws CreateTableException {
0374: // Choose the backend according to the defined policy
0375: CreateTableRule rule = createTablePolicy
0376: .getTableRule(tableName);
0377: if (rule == null) {
0378: rule = createTablePolicy.getDefaultRule();
0379: }
0380: return rule.getBackends(vdb.getBackends());
0381: }
0382:
0383: /**
0384: * Executes a select request <em>within a transation</em> on a given
0385: * backend. The transaction is lazily begun if it was not already started.
0386: *
0387: * @param request the Select request to execute
0388: * @param backend the backend on which the request must be executed
0389: * @param metadataCache the metadata cache
0390: * @return a <code>ControllerResultSet</code>
0391: * @throws SQLException if an error occurs while executing the request
0392: */
0393: private ControllerResultSet executeSelectRequestInTransaction(
0394: SelectRequest request, DatabaseBackend backend,
0395: MetadataCache metadataCache) throws SQLException {
0396: AbstractConnectionManager cm = backend
0397: .getConnectionManager(request.getLogin());
0398:
0399: if (cm == null) {
0400: String msg = Translate.get(
0401: "loadbalancer.connectionmanager.not.found", request
0402: .getLogin(), backend.getName());
0403: logger.error(msg);
0404: throw new SQLException(msg);
0405: }
0406:
0407: Connection c;
0408: long tid = request.getTransactionId();
0409:
0410: try {
0411: c = backend
0412: .getConnectionForTransactionAndLazyBeginIfNeeded(
0413: request, cm);
0414: } catch (UnreachableBackendException e1) {
0415: logger.error(Translate.get(
0416: "loadbalancer.backend.disabling.unreachable",
0417: backend.getName()));
0418: disableBackend(backend, true);
0419: throw new SQLException(Translate.get(
0420: "loadbalancer.backend.unreacheable", backend
0421: .getName()));
0422: } catch (NoTransactionStartWhenDisablingException e) {
0423: String msg = Translate.get(
0424: "loadbalancer.backend.is.disabling", request
0425: .getSqlShortForm(vdb
0426: .getSqlShortFormLength()), backend
0427: .getName());
0428: logger.error(msg);
0429: throw new SQLException(msg);
0430: }
0431:
0432: if (c == null)
0433: throw new SQLException(Translate.get(
0434: "loadbalancer.unable.retrieve.connection", String
0435: .valueOf(tid), backend.getName()));
0436:
0437: try {
0438: ControllerResultSet rs = executeStatementExecuteQueryOnBackend(
0439: request, backend, null, c, metadataCache);
0440: if (logger.isDebugEnabled()) {
0441: logger.debug(Translate.get(
0442: "loadbalancer.execute.transaction.on",
0443: new String[] { String.valueOf(tid),
0444: String.valueOf(request.getId()),
0445: backend.getName() }));
0446: }
0447: return rs;
0448: } catch (SQLException e) {
0449: throw e;
0450: } catch (BadConnectionException e) { // Connection failed, so did the transaction
0451: // Disable the backend.
0452: cm.deleteConnection(tid);
0453: String msg = Translate
0454: .get(
0455: "loadbalancer.backend.disabling.connection.failure",
0456: backend.getName());
0457: logger.error(msg);
0458: disableBackend(backend, true);
0459: throw new SQLException(msg);
0460: } catch (Throwable e) {
0461: throw new SQLException(Translate.get(
0462: "loadbalancer.request.failed.on.backend",
0463: new String[] {
0464: request.getSqlShortForm(vdb
0465: .getSqlShortFormLength()),
0466: backend.getName(), e.getMessage() }));
0467: }
0468: }
0469:
0470: /**
0471: * Executes a select request <em>in autocommit mode</em> on a given backend.
0472: *
0473: * @param request the Select request to execute
0474: * @param backend the backend on which the request must be executed
0475: * @param metadataCache the metadata cache
0476: * @return a <code>ControllerResultSet</code>
0477: * @throws SQLException if an error occurs while executing the request
0478: * @throws UnreachableBackendException if the backend is not reachable
0479: */
0480: private ControllerResultSet executeSelectRequestInAutoCommit(
0481: SelectRequest request, DatabaseBackend backend,
0482: MetadataCache metadataCache) throws SQLException,
0483: UnreachableBackendException {
0484: AbstractConnectionManager cm = backend
0485: .getConnectionManager(request.getLogin());
0486:
0487: // Sanity check
0488: if (cm == null) {
0489: String msg = Translate.get(
0490: "loadbalancer.connectionmanager.not.found", request
0491: .getLogin(), backend.getName());
0492: logger.error(msg);
0493: throw new SQLException(msg);
0494: }
0495:
0496: ControllerResultSet rs = null;
0497: boolean badConnection;
0498: do {
0499: badConnection = false;
0500: // Use a connection just for this request
0501: PooledConnection c = null;
0502: try {
0503: c = cm.retrieveConnectionInAutoCommit(request);
0504: } catch (UnreachableBackendException e1) {
0505: logger.error(Translate.get(
0506: "loadbalancer.backend.disabling.unreachable",
0507: backend.getName()));
0508: disableBackend(backend, true);
0509: throw new UnreachableBackendException(Translate.get(
0510: "loadbalancer.backend.unreacheable", backend
0511: .getName()));
0512: }
0513:
0514: // Sanity check
0515: if (c == null)
0516: throw new UnreachableBackendException(
0517: "No more connections on backend "
0518: + backend.getName());
0519:
0520: // Execute Query
0521: try {
0522: rs = executeStatementExecuteQueryOnBackend(request,
0523: backend, null, c.getConnection(), metadataCache);
0524: cm.releaseConnectionInAutoCommit(request, c);
0525: } catch (SQLException e) {
0526: cm.releaseConnectionInAutoCommit(request, c);
0527: throw new SQLException(Translate.get(
0528: "loadbalancer.request.failed.on.backend",
0529: new String[] {
0530: request.getSqlShortForm(vdb
0531: .getSqlShortFormLength()),
0532: backend.getName(), e.getMessage() }));
0533: } catch (BadConnectionException e) { // Get rid of the bad connection
0534: cm.deleteConnection(c);
0535: badConnection = true;
0536: } catch (Throwable e) {
0537: throw new SQLException(Translate.get(
0538: "loadbalancer.request.failed.on.backend",
0539: new String[] {
0540: request.getSqlShortForm(vdb
0541: .getSqlShortFormLength()),
0542: backend.getName(), e.getMessage() }));
0543: }
0544: } while (badConnection);
0545: if (logger.isDebugEnabled())
0546: logger
0547: .debug(Translate.get("loadbalancer.execute.on",
0548: String.valueOf(request.getId()), backend
0549: .getName()));
0550: return rs;
0551: }
0552:
0553: /**
0554: * Execute a read request on the selected backend.
0555: *
0556: * @param request the request to execute
0557: * @param backend the backend that will execute the request
0558: * @param metadataCache a metadataCache if any or null
0559: * @return the ResultSet
0560: * @throws SQLException if an error occurs
0561: */
0562: protected ControllerResultSet executeReadRequestOnBackend(
0563: SelectRequest request, DatabaseBackend backend,
0564: MetadataCache metadataCache) throws SQLException,
0565: UnreachableBackendException {
0566: // Handle macros
0567: handleMacros(request);
0568:
0569: // Execute the query
0570: if (request.isAutoCommit()) {
0571: return executeSelectRequestInAutoCommit(request, backend,
0572: metadataCache);
0573: } else {
0574: return executeSelectRequestInTransaction(request, backend,
0575: metadataCache);
0576: }
0577: }
0578:
0579: protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1;
0580: protected static final int CALLABLE_STATEMENT_EXECUTE = 2;
0581:
0582: /**
0583: * Execute a stored procedure on the selected backend.
0584: *
0585: * @param proc the stored procedure to execute
0586: * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0587: * false if we must call CallableStatement.execute()
0588: * @param backend the backend that will execute the request
0589: * @param metadataCache the metadataCache if any or null
0590: * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0591: * <code>ExecuteResult</code> object otherwise
0592: * @throws SQLException if an error occurs
0593: */
0594: protected Object executeStoredProcedureOnBackend(
0595: StoredProcedure proc, boolean isExecuteQuery,
0596: DatabaseBackend backend, MetadataCache metadataCache)
0597: throws SQLException, UnreachableBackendException {
0598: // Execute the query
0599: if (proc.isAutoCommit()) {
0600: return executeStoredProcedureInAutoCommit(proc,
0601: isExecuteQuery, backend, metadataCache);
0602: } else {
0603: return executeStoredProcedureInTransaction(proc,
0604: isExecuteQuery, backend, metadataCache);
0605: }
0606: }
0607:
0608: /**
0609: * Executes a stored procedure <em>within a transation</em> on the selected
0610: * backend. The transaction is lazily started if it has not already begun.
0611: *
0612: * @param proc the <em>autocommit</em> stored procedure to execute
0613: * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0614: * false if we must call CallableStatement.execute()
0615: * @param backend the backend that will execute the request
0616: * @param metadataCache the metadataCache if any or null
0617: * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0618: * <code>ExecuteResult</code> object otherwise
0619: * @throws SQLException if an error occurs
0620: */
0621: private Object executeStoredProcedureInTransaction(
0622: StoredProcedure proc, boolean isExecuteQuery,
0623: DatabaseBackend backend, MetadataCache metadataCache)
0624: throws SQLException {
0625: // Inside a transaction
0626: // Ok, we have a backend, let's execute the request
0627: AbstractConnectionManager cm = backend
0628: .getConnectionManager(proc.getLogin());
0629:
0630: // Sanity check
0631: if (cm == null) {
0632: String msg = Translate.get(
0633: "loadbalancer.connectionmanager.not.found", proc
0634: .getLogin(), backend.getName());
0635: logger.error(msg);
0636: throw new SQLException(msg);
0637: }
0638:
0639: Connection c;
0640: long tid = proc.getTransactionId();
0641:
0642: try {
0643: c = backend
0644: .getConnectionForTransactionAndLazyBeginIfNeeded(
0645: proc, cm);
0646: } catch (UnreachableBackendException e1) {
0647: logger.error(Translate.get(
0648: "loadbalancer.backend.disabling.unreachable",
0649: backend.getName()));
0650: disableBackend(backend, true);
0651: throw new SQLException(Translate.get(
0652: "loadbalancer.backend.unreacheable", backend
0653: .getName()));
0654: } catch (NoTransactionStartWhenDisablingException e) {
0655: String msg = Translate.get(
0656: "loadbalancer.backend.is.disabling", proc
0657: .getSqlShortForm(vdb
0658: .getSqlShortFormLength()), backend
0659: .getName());
0660: logger.error(msg);
0661: throw new SQLException(msg);
0662: }
0663:
0664: // Sanity check
0665: if (c == null)
0666: throw new SQLException(Translate.get(
0667: "loadbalancer.unable.retrieve.connection", String
0668: .valueOf(tid), backend.getName()));
0669:
0670: // Execute Query
0671: try {
0672: if (isExecuteQuery)
0673: return AbstractLoadBalancer
0674: .executeCallableStatementExecuteQueryOnBackend(
0675: proc, backend, null, c, metadataCache);
0676: else
0677: return AbstractLoadBalancer
0678: .executeCallableStatementExecuteOnBackend(
0679: proc,
0680: backend,
0681: null,
0682: cm
0683: .retrieveConnectionForTransaction(tid),
0684: metadataCache);
0685: } catch (Exception e) {
0686: throw new SQLException(Translate.get(
0687: "loadbalancer.storedprocedure.failed.on.backend",
0688: new String[] {
0689: proc.getSqlShortForm(vdb
0690: .getSqlShortFormLength()),
0691: backend.getName(), e.getMessage() }));
0692: } finally {
0693: if (logger.isDebugEnabled())
0694: logger.debug(Translate.get(
0695: "loadbalancer.execute.transaction.on",
0696: new String[] { String.valueOf(tid),
0697: String.valueOf(proc.getId()),
0698: backend.getName() }));
0699: }
0700: }
0701:
0702: /**
0703: * Executes a stored procedure <em>in autocommit mode</em> on the selected
0704: * backend.
0705: *
0706: * @param proc the <em>autocommit</em> stored procedure to execute
0707: * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
0708: * false if we must call CallableStatement.execute()
0709: * @param backend the backend that will execute the request
0710: * @param metadataCache the metadataCache if any or null
0711: * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
0712: * <code>ExecuteResult</code> object otherwise
0713: * @throws SQLException if an error occurs
0714: */
0715: private Object executeStoredProcedureInAutoCommit(
0716: StoredProcedure proc, boolean isExecuteQuery,
0717: DatabaseBackend backend, MetadataCache metadataCache)
0718: throws SQLException, UnreachableBackendException {
0719: // Ok, we have a backend, let's execute the request
0720: AbstractConnectionManager cm = backend
0721: .getConnectionManager(proc.getLogin());
0722:
0723: // Sanity check
0724: if (cm == null) {
0725: String msg = Translate.get(
0726: "loadbalancer.connectionmanager.not.found", proc
0727: .getLogin(), backend.getName());
0728: logger.error(msg);
0729: throw new SQLException(msg);
0730: }
0731:
0732: // Use a connection just for this request
0733: PooledConnection c = null;
0734: try {
0735: c = cm.retrieveConnectionInAutoCommit(proc);
0736: } catch (UnreachableBackendException e1) {
0737: logger.error(Translate.get(
0738: "loadbalancer.backend.disabling.unreachable",
0739: backend.getName()));
0740: disableBackend(backend, true);
0741: throw new UnreachableBackendException(Translate.get(
0742: "loadbalancer.backend.unreacheable", backend
0743: .getName()));
0744: }
0745:
0746: // Sanity check
0747: if (c == null)
0748: throw new SQLException(Translate.get(
0749: "loadbalancer.backend.no.connection", backend
0750: .getName()));
0751:
0752: // Execute Query
0753: try {
0754: if (isExecuteQuery)
0755: return AbstractLoadBalancer
0756: .executeCallableStatementExecuteQueryOnBackend(
0757: proc, backend, null, c.getConnection(),
0758: metadataCache);
0759: else
0760: return AbstractLoadBalancer
0761: .executeCallableStatementExecuteOnBackend(proc,
0762: backend, null, c, metadataCache);
0763: } catch (Exception e) {
0764: throw new SQLException(Translate.get(
0765: "loadbalancer.storedprocedure.failed.on.backend",
0766: new String[] {
0767: proc.getSqlShortForm(vdb
0768: .getSqlShortFormLength()),
0769: backend.getName(), e.getMessage() }));
0770: } finally {
0771: cm.releaseConnectionInAutoCommit(proc, c);
0772: if (logger.isDebugEnabled())
0773: logger.debug(Translate.get(
0774: "loadbalancer.storedprocedure.on", String
0775: .valueOf(proc.getId()), backend
0776: .getName()));
0777: }
0778: }
0779:
0780: /**
0781: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0782: * MetadataCache)
0783: */
0784: public ControllerResultSet callableStatementExecuteQuery(
0785: StoredProcedure proc, MetadataCache metadataCache)
0786: throws SQLException, AllBackendsFailedException {
0787: CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure(
0788: proc, EXECUTE_QUERY_TASK, metadataCache);
0789: return task.getResult();
0790: }
0791:
0792: /**
0793: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0794: */
0795: public ExecuteUpdateResult callableStatementExecuteUpdate(
0796: StoredProcedure proc) throws SQLException,
0797: AllBackendsFailedException {
0798: CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure(
0799: proc, EXECUTE_UPDATE_TASK, null);
0800: return task.getResult();
0801: }
0802:
0803: /**
0804: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
0805: * MetadataCache)
0806: */
0807: public ExecuteResult callableStatementExecute(StoredProcedure proc,
0808: MetadataCache metadataCache) throws SQLException,
0809: AllBackendsFailedException {
0810: CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure(
0811: proc, EXECUTE_TASK, metadataCache);
0812: return task.getResult();
0813: }
0814:
0815: private static final int EXECUTE_QUERY_TASK = 0;
0816: private static final int EXECUTE_UPDATE_TASK = 1;
0817: private static final int EXECUTE_TASK = 2;
0818:
0819: /**
0820: * Post the stored procedure call in the threads task list.
0821: * <p>
0822: * Note that macros are also processed here.
0823: *
0824: * @param proc the stored procedure to call
0825: * @param taskType one of EXECUTE_QUERY_TASK, EXECUTE_UPDATE_TASK or
0826: * EXECUTE_TASK
0827: * @param metadataCache the metadataCache if any or null
0828: * @return the task that has been executed (caller can get the result by
0829: * calling getResult())
0830: * @throws SQLException if an error occurs
0831: * @throws AllBackendsFailedException if all backends failed to execute the
0832: * stored procedure
0833: * @throws NoMoreBackendException if no backend was available to execute the
0834: * stored procedure
0835: */
0836: private AbstractTask callStoredProcedure(StoredProcedure proc,
0837: int taskType, MetadataCache metadataCache)
0838: throws SQLException, AllBackendsFailedException,
0839: NoMoreBackendException {
0840: // Total ordering mainly for distributed virtual databases.
0841: boolean removeFromTotalOrderQueue = waitForTotalOrder(proc,
0842: true);
0843:
0844: // Handle macros
0845: handleMacros(proc);
0846:
0847: int nbOfThreads = acquireLockAndCheckNbOfThreads(proc, String
0848: .valueOf(proc.getId()));
0849:
0850: // Create the task
0851: AbstractTask task;
0852: switch (taskType) {
0853: case EXECUTE_QUERY_TASK:
0854: task = new CallableStatementExecuteQueryTask(
0855: getNbToWait(nbOfThreads), nbOfThreads, proc,
0856: metadataCache);
0857: break;
0858: case EXECUTE_UPDATE_TASK:
0859: task = new CallableStatementExecuteUpdateTask(
0860: getNbToWait(nbOfThreads), nbOfThreads, proc);
0861: break;
0862: case EXECUTE_TASK:
0863: task = new CallableStatementExecuteTask(
0864: getNbToWait(nbOfThreads), nbOfThreads, proc,
0865: metadataCache);
0866: break;
0867: default:
0868: throw new RuntimeException("Unhandled task type "
0869: + taskType + " in callStoredProcedure");
0870: }
0871:
0872: List backendList = getBackendsWithStoredProcedure(proc
0873: .getProcedureKey(), proc.getNbOfParameters(),
0874: nbOfThreads);
0875:
0876: if (backendList.size() == 0) {
0877: releaseLockAndUnlockNextQuery(proc);
0878: throw new SQLException(Translate.get(
0879: "loadbalancer.backend.no.required.storedprocedure",
0880: proc.getProcedureKey()));
0881: }
0882:
0883: task.setTotalNb(backendList.size());
0884: atomicTaskPostInQueueAndReleaseLock(proc, task, backendList,
0885: removeFromTotalOrderQueue);
0886:
0887: synchronized (task) {
0888: if (!task.hasCompleted())
0889: waitForTaskCompletion(proc.getTimeout() * 1000L, String
0890: .valueOf(proc.getId()), task);
0891:
0892: checkTaskCompletion(task);
0893: return task;
0894: }
0895: }
0896:
0897: /**
0898: * Check the completion status of the task and throws appropriate Exceptions
0899: * if the status of the task was not successful, otherwise do nothing.
0900: *
0901: * @param task the completed AbstractTask
0902: * @throws AllBackendsFailedException if all backends failed to execute the
0903: * request
0904: * @exception NoMoreBackendException if no backends are left to execute the
0905: * request
0906: * @exception SQLException if an error occurs
0907: */
0908: private void checkTaskCompletion(AbstractTask task)
0909: throws NoMoreBackendException, AllBackendsFailedException,
0910: SQLException {
0911: AbstractRequest request = task.getRequest();
0912:
0913: if (task.getSuccess() > 0)
0914: return;
0915:
0916: // Check that someone failed, it might be the case that we only have
0917: // disabling backends left and they have not played this query (thus
0918: // none of them have succeeded or failed).
0919: if (task.getFailed() == 0) {
0920: throw new NoMoreBackendException(Translate
0921: .get("loadbalancer.backendlist.empty"));
0922: }
0923:
0924: if (task.getSuccess() == 0) {
0925: // All backends that executed the query failed
0926: List exceptions = task.getExceptions();
0927: if (exceptions == null)
0928: throw new AllBackendsFailedException(Translate.get(
0929: "loadbalancer.request.failed.all",
0930: new Object[] { request.getType(),
0931: String.valueOf(request.getId()) }));
0932: else {
0933: String errorMsg = Translate.get(
0934: "loadbalancer.request.failed.stack",
0935: new Object[] { request.getType(),
0936: String.valueOf(request.getId()) })
0937: + "\n";
0938: SQLException ex = SQLExceptionFactory.getSQLException(
0939: exceptions, errorMsg);
0940: logger.error(ex.getMessage());
0941: throw ex;
0942: }
0943: }
0944: }
0945:
0946: /**
0947: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
0948: */
0949: public ControllerResultSet getPreparedStatementGetMetaData(
0950: AbstractRequest request) throws SQLException {
0951: // Choose a backend
0952: try {
0953: vdb.acquireReadLockBackendLists();
0954: } catch (InterruptedException e) {
0955: String msg = Translate.get(
0956: "loadbalancer.backendlist.acquire.readlock.failed",
0957: e);
0958: logger.error(msg);
0959: throw new SQLException(msg);
0960: }
0961:
0962: /*
0963: * The backend that will execute the query
0964: */
0965: DatabaseBackend backend = null;
0966:
0967: // Note that vdb lock is released in the finally clause of this try/catch
0968: // block
0969: try {
0970: ArrayList backends = vdb.getBackends();
0971: int size = backends.size();
0972:
0973: if (size == 0)
0974: throw new SQLException(Translate.get(
0975: "loadbalancer.execute.no.backend.available",
0976: request.getId()));
0977:
0978: // Choose the first available backend
0979: for (int i = 0; i < size; i++) {
0980: DatabaseBackend b = (DatabaseBackend) backends.get(i);
0981: if (b.isReadEnabled()) {
0982: backend = b;
0983: break;
0984: }
0985: }
0986: } catch (Throwable e) {
0987: String msg = Translate.get(
0988: "loadbalancer.execute.find.backend.failed",
0989: new String[] {
0990: request.getSqlShortForm(vdb
0991: .getSqlShortFormLength()),
0992: e.getMessage() });
0993: logger.error(msg, e);
0994: throw new SQLException(msg);
0995: } finally {
0996: vdb.releaseReadLockBackendLists();
0997: }
0998:
0999: if (backend == null)
1000: throw new NoMoreBackendException(Translate.get(
1001: "loadbalancer.execute.no.backend.enabled", request
1002: .getId()));
1003:
1004: // Ok, we have a backend, let's execute the request
1005: AbstractConnectionManager cm = backend
1006: .getConnectionManager(request.getLogin());
1007:
1008: // Sanity check
1009: if (cm == null) {
1010: String msg = Translate.get(
1011: "loadbalancer.connectionmanager.not.found",
1012: new String[] { request.getLogin(),
1013: backend.getName() });
1014: logger.error(msg);
1015: throw new SQLException(msg);
1016: }
1017:
1018: // Execute the query
1019: if (request.isAutoCommit()) {
1020: ControllerResultSet rs = null;
1021: boolean badConnection;
1022: do {
1023: badConnection = false;
1024: // Use a connection just for this request
1025: PooledConnection c = null;
1026: try {
1027: c = cm.retrieveConnectionInAutoCommit(request);
1028: } catch (UnreachableBackendException e1) {
1029: logger
1030: .error(Translate
1031: .get(
1032: "loadbalancer.backend.disabling.unreachable",
1033: backend.getName()));
1034: disableBackend(backend, true);
1035: // Retry on a different backend
1036: return getPreparedStatementGetMetaData(request);
1037: }
1038:
1039: // Sanity check
1040: if (c == null)
1041: throw new SQLException(Translate.get(
1042: "loadbalancer.backend.no.connection",
1043: backend.getName()));
1044:
1045: // Execute Query
1046: try {
1047: rs = preparedStatementGetMetaDataOnBackend(request
1048: .getSqlOrTemplate(), backend, c
1049: .getConnection());
1050: cm.releaseConnectionInAutoCommit(request, c);
1051: } catch (SQLException e) {
1052: cm.releaseConnectionInAutoCommit(request, c);
1053: throw SQLExceptionFactory
1054: .getSQLException(
1055: e,
1056: Translate
1057: .get(
1058: "loadbalancer.request.failed.on.backend",
1059: new String[] {
1060: request
1061: .getSqlShortForm(vdb
1062: .getSqlShortFormLength()),
1063: backend
1064: .getName(),
1065: e
1066: .getMessage() }));
1067: } catch (BadConnectionException e) { // Get rid of the bad connection
1068: cm.deleteConnection(c);
1069: badConnection = true;
1070: } catch (Throwable e) {
1071: cm.releaseConnectionInAutoCommit(request, c);
1072: throw new SQLException(
1073: Translate
1074: .get(
1075: "loadbalancer.request.failed.on.backend",
1076: new String[] {
1077: request
1078: .getSqlShortForm(vdb
1079: .getSqlShortFormLength()),
1080: backend.getName(),
1081: e.getMessage() }));
1082: }
1083: } while (badConnection);
1084: if (logger.isDebugEnabled())
1085: logger.debug(Translate.get("loadbalancer.execute.on",
1086: new String[] { String.valueOf(request.getId()),
1087: backend.getName() }));
1088: return rs;
1089: } else { // Inside a transaction
1090: Connection c;
1091: long tid = request.getTransactionId();
1092:
1093: try {
1094: c = backend
1095: .getConnectionForTransactionAndLazyBeginIfNeeded(
1096: request, cm);
1097: } catch (UnreachableBackendException e1) {
1098: logger.error(Translate.get(
1099: "loadbalancer.backend.disabling.unreachable",
1100: backend.getName()));
1101: disableBackend(backend, true);
1102: throw new NoMoreBackendException(Translate.get(
1103: "loadbalancer.backend.unreacheable", backend
1104: .getName()));
1105: } catch (NoTransactionStartWhenDisablingException e) {
1106: String msg = Translate.get(
1107: "loadbalancer.backend.is.disabling",
1108: new String[] {
1109: request.getSqlShortForm(vdb
1110: .getSqlShortFormLength()),
1111: backend.getName() });
1112: logger.error(msg);
1113: throw new SQLException(msg);
1114: }
1115:
1116: // Sanity check
1117: if (c == null)
1118: throw new SQLException(Translate.get(
1119: "loadbalancer.unable.retrieve.connection",
1120: new String[] { String.valueOf(tid),
1121: backend.getName() }));
1122:
1123: // Execute Query
1124: ControllerResultSet rs = null;
1125: try {
1126: rs = preparedStatementGetMetaDataOnBackend(request
1127: .getSqlOrTemplate(), backend, c);
1128: } catch (SQLException e) {
1129: throw e;
1130: } catch (BadConnectionException e) { // Connection failed, so did the transaction
1131: // Disable the backend.
1132: cm.deleteConnection(tid);
1133: String msg = Translate
1134: .get(
1135: "loadbalancer.backend.disabling.connection.failure",
1136: backend.getName());
1137: logger.error(msg);
1138: disableBackend(backend, true);
1139: throw new SQLException(msg);
1140: } catch (Throwable e) {
1141: throw new SQLException(Translate.get(
1142: "loadbalancer.request.failed.on.backend",
1143: new String[] {
1144: request.getSqlShortForm(vdb
1145: .getSqlShortFormLength()),
1146: backend.getName(), e.getMessage() }));
1147: }
1148: if (logger.isDebugEnabled())
1149: logger.debug(Translate.get(
1150: "loadbalancer.execute.transaction.on",
1151: new String[] { String.valueOf(tid),
1152: String.valueOf(request.getId()),
1153: backend.getName() }));
1154: return rs;
1155: }
1156: }
1157:
1158: /*
1159: * Transaction management
1160: */
1161: /**
1162: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1163: */
1164: public void abort(TransactionMetaData tm) throws SQLException {
1165: long tid = tm.getTransactionId();
1166:
1167: DistributedRollback toqObject = null;
1168: /*
1169: * Let previous queries be flushed into the load balancer queues so that we
1170: * can abort them and that no queries for that transaction wait in the total
1171: * order queue while we are aborting. Note that the wait and remove from the
1172: * total order queue will be done in the call to rollback at the end of this
1173: * method.
1174: */
1175: if (vdb.getTotalOrderQueue() != null) {
1176: toqObject = new DistributedRollback(tm.getLogin(), tid);
1177: waitForTotalOrder(toqObject, false);
1178: }
1179:
1180: // Acquire the lock
1181: String requestDescription = "abort " + tid;
1182: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1183: requestDescription);
1184:
1185: boolean rollbackInProgress = false;
1186: synchronized (enabledBackends) {
1187: // Abort all queries on all backends that have started this transaction
1188: for (int i = 0; i < nbOfThreads; i++) {
1189: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1190: .get(i);
1191: rollbackInProgress = rollbackInProgress
1192: || backend.getTaskQueues()
1193: .abortAllQueriesForTransaction(tid);
1194: }
1195: }
1196:
1197: // Release the lock
1198: backendListLock.releaseRead();
1199:
1200: if (rollbackInProgress) { // already aborting
1201: if (vdb.getTotalOrderQueue() != null)
1202: removeObjectFromAndNotifyTotalOrderQueue(toqObject);
1203: return;
1204: }
1205:
1206: rollback(tm);
1207: }
1208:
1209: /**
1210: * Begins a new transaction.
1211: *
1212: * @param tm the transaction marker metadata
1213: * @exception SQLException if an error occurs
1214: */
1215: public final void begin(TransactionMetaData tm) throws SQLException {
1216: }
1217:
1218: /**
1219: * Commits a transaction.
1220: *
1221: * @param tm the transaction marker metadata
1222: * @exception SQLException if an error occurs
1223: */
1224: public void commit(TransactionMetaData tm) throws SQLException {
1225: long tid = tm.getTransactionId();
1226:
1227: // Ordering for distributed virtual database
1228: boolean canTakeReadLock = false;
1229: DistributedCommit totalOrderCommit = null;
1230: if (vdb.getTotalOrderQueue() != null) {
1231: // Total ordering mainly for distributed virtual databases.
1232: // If waitForTotalOrder returns true then the query has been scheduled in
1233: // total order and there is no need to take a write lock later to resolve
1234: // potential conflicts.
1235: totalOrderCommit = new DistributedCommit(tm.getLogin(), tid);
1236: canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1237: if (!canTakeReadLock)
1238: // This is a local commit no total order info
1239: totalOrderCommit = null;
1240: }
1241:
1242: // Acquire the lock
1243: int nbOfThreads = acquireLockAndCheckNbOfThreads(
1244: totalOrderCommit, "commit " + tid);
1245:
1246: List commitList = getBackendsWithStartedTransaction(new Long(
1247: tid), nbOfThreads);
1248:
1249: int nbOfThreadsToCommit = commitList.size();
1250: CommitTask task = null;
1251: if (nbOfThreadsToCommit != 0) {
1252: task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1253: nbOfThreadsToCommit, tm);
1254: }
1255: // FIXME waht if nbOfThreadsToCommit == 0?
1256:
1257: // Post the task in the non-conflicting queues.
1258: synchronized (enabledBackends) {
1259: for (int i = 0; i < nbOfThreadsToCommit; i++) {
1260: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1261: .get(i);
1262: backend.getTaskQueues()
1263: .addTaskToBackendTotalOrderQueue(task);
1264: }
1265: }
1266:
1267: // Release the lock
1268: backendListLock.releaseRead();
1269:
1270: // Unblock next query from total order queue
1271: if (totalOrderCommit != null)
1272: removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1273:
1274: waitForCommit(task, tm.getTimeout());
1275: }
1276:
1277: /**
1278: * Rollbacks a transaction.
1279: *
1280: * @param tm the transaction marker metadata
1281: * @exception SQLException if an error occurs
1282: */
1283: public void rollback(TransactionMetaData tm) throws SQLException {
1284: long tid = tm.getTransactionId();
1285:
1286: // Ordering for distributed virtual database
1287: DistributedRollback totalOrderRollback = null;
1288: boolean canTakeReadLock = false;
1289: if (vdb.getTotalOrderQueue() != null) {
1290: totalOrderRollback = new DistributedRollback(tm.getLogin(),
1291: tid);
1292: // Total ordering mainly for distributed virtual databases.
1293: // If waitForTotalOrder returns true then the query has been scheduled in
1294: // total order and there is no need to take a write lock later to resolve
1295: // potential conflicts.
1296: canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1297: false);
1298: if (!canTakeReadLock)
1299: // This is a local rollback no total order info
1300: totalOrderRollback = null;
1301: }
1302:
1303: // Acquire the lock
1304: int nbOfThreads = acquireLockAndCheckNbOfThreads(
1305: totalOrderRollback, "rollback " + tid);
1306:
1307: // Build the list of backends that need to rollback this transaction
1308: List rollbackList = getBackendsWithStartedTransaction(new Long(
1309: tid), nbOfThreads);
1310:
1311: int nbOfThreadsToRollback = rollbackList.size();
1312: RollbackTask task = null;
1313: if (nbOfThreadsToRollback != 0)
1314: task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1315: nbOfThreadsToRollback, tm);
1316:
1317: // Post the task in the non-conflicting queues.
1318: synchronized (enabledBackends) {
1319: for (int i = 0; i < nbOfThreadsToRollback; i++) {
1320: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1321: .get(i);
1322: backend.getTaskQueues()
1323: .addTaskToBackendTotalOrderQueue(task);
1324: }
1325: }
1326:
1327: // Release the lock
1328: backendListLock.releaseRead();
1329:
1330: // Unblock next query from total order queue
1331: if (totalOrderRollback != null)
1332: removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1333:
1334: waitForRollback(task, tm.getTimeout());
1335: }
1336:
1337: /**
1338: * Rollback a transaction to a savepoint
1339: *
1340: * @param tm The transaction marker metadata
1341: * @param savepointName The name of the savepoint
1342: * @throws SQLException if an error occurs
1343: */
1344: public void rollbackToSavepoint(TransactionMetaData tm,
1345: String savepointName) throws SQLException {
1346: long tid = tm.getTransactionId();
1347:
1348: // Ordering for distributed virtual database
1349: DistributedRollbackToSavepoint totalOrderRollback = null;
1350: boolean canTakeReadLock = false;
1351: if (vdb.getTotalOrderQueue() != null) {
1352: totalOrderRollback = new DistributedRollbackToSavepoint(
1353: tid, savepointName);
1354: // Total ordering mainly for distributed virtual databases.
1355: // If waitForTotalOrder returns true then the query has been scheduled in
1356: // total order and there is no need to take a write lock later to resolve
1357: // potential conflicts.
1358: canTakeReadLock = waitForTotalOrder(totalOrderRollback,
1359: false);
1360: if (!canTakeReadLock)
1361: // This is a local commit no total order info
1362: totalOrderRollback = null;
1363: }
1364:
1365: // Acquire the lock
1366: int nbOfThreads = acquireLockAndCheckNbOfThreads(
1367: totalOrderRollback, "rollback " + savepointName + " "
1368: + tid);
1369:
1370: // Build the list of backends that need to rollback this transaction
1371: List rollbackList = getBackendsWithStartedTransaction(new Long(
1372: tid), nbOfThreads);
1373:
1374: int nbOfThreadsToRollback = rollbackList.size();
1375: RollbackToSavepointTask task = null;
1376: if (nbOfThreadsToRollback != 0)
1377: task = new RollbackToSavepointTask(
1378: getNbToWait(nbOfThreadsToRollback),
1379: nbOfThreadsToRollback, tm, savepointName);
1380:
1381: // Post the task in the non-conflicting queues.
1382: synchronized (enabledBackends) {
1383: for (int i = 0; i < nbOfThreadsToRollback; i++) {
1384: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1385: .get(i);
1386: backend.getTaskQueues()
1387: .addTaskToBackendTotalOrderQueue(task);
1388: }
1389: }
1390:
1391: // Release the lock
1392: backendListLock.releaseRead();
1393:
1394: // Unblock next query from total order queue
1395: if (totalOrderRollback != null)
1396: removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1397:
1398: waitForRollbackToSavepoint(task, savepointName, tm.getTimeout());
1399: }
1400:
1401: /**
1402: * Release a savepoint from a transaction
1403: *
1404: * @param tm The transaction marker metadata
1405: * @param savepointName The name of the savepoint ro release
1406: * @throws SQLException if an error occurs
1407: */
1408: public void releaseSavepoint(TransactionMetaData tm,
1409: String savepointName) throws SQLException {
1410: long tid = tm.getTransactionId();
1411:
1412: // Ordering for distributed virtual database
1413: DistributedReleaseSavepoint totalOrderRelease = null;
1414: boolean canTakeReadLock = false;
1415: if (vdb.getTotalOrderQueue() != null) {
1416: totalOrderRelease = new DistributedReleaseSavepoint(tid,
1417: savepointName);
1418: // Total ordering mainly for distributed virtual databases.
1419: // If waitForTotalOrder returns true then the query has been scheduled in
1420: // total order and there is no need to take a write lock later to resolve
1421: // potential conflicts.
1422: canTakeReadLock = waitForTotalOrder(totalOrderRelease,
1423: false);
1424: if (!canTakeReadLock)
1425: // This is a local commit no total order info
1426: totalOrderRelease = null;
1427: }
1428:
1429: // Acquire the lock
1430: int nbOfThreads = acquireLockAndCheckNbOfThreads(
1431: totalOrderRelease, "release savepoint " + savepointName
1432: + " " + tid);
1433:
1434: // Build the list of backends that need to rollback this transaction
1435: List savepointList = getBackendsWithStartedTransaction(
1436: new Long(tid), nbOfThreads);
1437:
1438: int nbOfSavepoints = savepointList.size();
1439: ReleaseSavepointTask task = null;
1440: if (nbOfSavepoints != 0)
1441: task = new ReleaseSavepointTask(getNbToWait(nbOfThreads),
1442: nbOfThreads, tm, savepointName);
1443:
1444: // Post the task in the non-conflicting queues.
1445: synchronized (enabledBackends) {
1446: for (int i = 0; i < nbOfSavepoints; i++) {
1447: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1448: .get(i);
1449: backend.getTaskQueues()
1450: .addTaskToBackendTotalOrderQueue(task);
1451: }
1452: }
1453:
1454: // Release the lock
1455: backendListLock.releaseRead();
1456:
1457: // Unblock next query from total order queue
1458: if (totalOrderRelease != null)
1459: removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease);
1460:
1461: waitForReleaseSavepoint(task, savepointName, tm.getTimeout());
1462: }
1463:
1464: /**
1465: * Set a savepoint to a transaction.
1466: *
1467: * @param tm The transaction marker metadata
1468: * @param savepointName The name of the new savepoint
1469: * @throws SQLException if an error occurs
1470: */
1471: public void setSavepoint(TransactionMetaData tm,
1472: String savepointName) throws SQLException {
1473: long tid = tm.getTransactionId();
1474:
1475: // Ordering for distributed virtual database
1476: DistributedSetSavepoint totalOrderSavepoint = null;
1477: boolean canTakeReadLock = false;
1478: if (vdb.getTotalOrderQueue() != null) {
1479: totalOrderSavepoint = new DistributedSetSavepoint(tm
1480: .getLogin(), tid, savepointName);
1481: // Total ordering mainly for distributed virtual databases.
1482: // If waitForTotalOrder returns true then the query has been scheduled in
1483: // total order and there is no need to take a write lock later to resolve
1484: // potential conflicts.
1485: canTakeReadLock = waitForTotalOrder(totalOrderSavepoint,
1486: false);
1487: if (!canTakeReadLock)
1488: // This is a local commit no total order info
1489: totalOrderSavepoint = null;
1490: }
1491:
1492: // Update the recovery log
1493: if (recoveryLog != null)
1494: recoveryLog.logSetSavepoint(tm, savepointName);
1495:
1496: // Acquire the lock
1497: String requestDescription = "set savepoint " + savepointName
1498: + " " + tid;
1499: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1500: requestDescription);
1501:
1502: SavepointTask task = null;
1503:
1504: // Post the task in the non-conflicting queues of all backends.
1505: synchronized (enabledBackends) {
1506: if (nbOfThreads != 0) {
1507: task = new SavepointTask(getNbToWait(nbOfThreads),
1508: nbOfThreads, tm, savepointName);
1509: for (int i = 0; i < nbOfThreads; i++) {
1510: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1511: .get(i);
1512: backend.getTaskQueues()
1513: .addTaskToBackendTotalOrderQueue(task);
1514: }
1515: }
1516: }
1517:
1518: // Release the lock
1519: backendListLock.releaseRead();
1520:
1521: // Unblock next query from total order queue
1522: if (totalOrderSavepoint != null)
1523: removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint);
1524:
1525: waitForSavepoint(task, savepointName, tm.getTimeout());
1526: }
1527:
1528: /**
1529: * Wait for a <code>CommitTask</code> to complete
1530: *
1531: * @param task the <code>CommitTask</code>
1532: * @param timeout the timeout (in milliseconds) for task completion
1533: * @throws SQLException if an error occurs while completing the commit task
1534: */
1535: // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1536: // FIXME the timeout is a field of the CommitTask but the getter is not
1537: // defined
1538: private void waitForCommit(CommitTask task, long timeout)
1539: throws SQLException {
1540: // Check if someone had something to commit
1541: if (task == null)
1542: return;
1543:
1544: long tid = task.getTransactionId();
1545:
1546: synchronized (task) {
1547: if (!task.hasCompleted())
1548: waitForTaskCompletion(timeout, "commit " + tid, task);
1549:
1550: if (task.getSuccess() == 0) { // All tasks failed
1551: List exceptions = task.getExceptions();
1552: if (exceptions == null)
1553: throw new SQLException(Translate.get(
1554: "loadbalancer.commit.all.failed", tid));
1555: else {
1556: SQLException ex = SQLExceptionFactory
1557: .getSQLException(exceptions, Translate.get(
1558: "loadbalancer.commit.failed.stack",
1559: tid)
1560: + "\n");
1561: logger.error(ex.getMessage());
1562: throw ex;
1563: }
1564: }
1565: }
1566: }
1567:
1568: /**
1569: * Wait for a <code>RollbackTask</code> to complete
1570: *
1571: * @param task the <code>RollbackTask</code>
1572: * @param timeout the timeout (in milliseconds) for task completion
1573: * @throws SQLException if an error occurs while completing the rollback task
1574: */
1575: // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1576: // FIXME the timeout is a field of the RollbacTask but the getter is not
1577: // defined
1578: private void waitForRollback(RollbackTask task, long timeout)
1579: throws SQLException {
1580: // Check if someone had something to rollback
1581: if (task == null)
1582: return;
1583:
1584: long tid = task.getTransactionId();
1585:
1586: synchronized (task) {
1587: if (!task.hasCompleted())
1588: waitForTaskCompletion(timeout, "rollback " + tid, task);
1589:
1590: if (task.getSuccess() == 0) { // All tasks failed
1591: List exceptions = task.getExceptions();
1592: if (exceptions == null)
1593: throw new SQLException(Translate.get(
1594: "loadbalancer.rollback.all.failed", tid));
1595: else {
1596: SQLException ex = SQLExceptionFactory
1597: .getSQLException(
1598: exceptions,
1599: Translate
1600: .get(
1601: "loadbalancer.rollback.failed.stack",
1602: tid)
1603: + "\n");
1604: logger.error(ex.getMessage());
1605: throw ex;
1606: }
1607: }
1608: }
1609: }
1610:
1611: /**
1612: * Wait for a <code>RollbackToSavepointTask</code> to complete
1613: *
1614: * @param task the <code>RollbackToSavepointTask</code>
1615: * @param savepoint the savepoint name
1616: * @param timeout the timeout (in milliseconds) for task completion
1617: * @throws SQLException if an error occurs while completing the rollback to
1618: * savepoint task
1619: */
1620: // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1621: // FIXME the timeout is a field of the RollbackToSavepointTask but the getter
1622: // is not defined
1623: // FIXME the savepoint is a field of the RollbackToSavepointTask but the
1624: // getter is not defined
1625: private void waitForRollbackToSavepoint(
1626: RollbackToSavepointTask task, String savepoint, long timeout)
1627: throws SQLException {
1628: // Check if someone had something to rollback
1629: if (task == null)
1630: return;
1631:
1632: long tid = task.getTransactionId();
1633:
1634: synchronized (task) {
1635: if (!task.hasCompleted())
1636: waitForTaskCompletion(timeout, "rollback " + savepoint
1637: + " " + tid, task);
1638:
1639: if (task.getSuccess() == 0) { // All tasks failed
1640: List exceptions = task.getExceptions();
1641: if (exceptions == null)
1642: throw new SQLException(
1643: Translate
1644: .get(
1645: "loadbalancer.rollbacksavepoint.all.failed",
1646: savepoint, String
1647: .valueOf(tid)));
1648: else {
1649: SQLException ex = SQLExceptionFactory
1650: .getSQLException(
1651: exceptions,
1652: Translate
1653: .get(
1654: "loadbalancer.rollbacksavepoint.failed.stack",
1655: savepoint,
1656: String.valueOf(tid))
1657: + "\n");
1658: logger.error(ex.getMessage());
1659: throw ex;
1660: }
1661: }
1662: }
1663: }
1664:
1665: /**
1666: * Wait for a <code>ReleaseSavepointTask</code> to complete
1667: *
1668: * @param task the <code>ReleaseSavepointTask</code>
1669: * @param savepoint the savepoint name
1670: * @param timeout the timeout (in milliseconds) for task completion
1671: * @throws SQLException if an error occurs while completing the release
1672: * savepoint task
1673: */
1674: // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1675: // FIXME the timeout is a field of the ReleaseSavepointTask but the getter is
1676: // not defined
1677: // FIXME the savepoint is a field of the ReleaseSavepointTask but the getter
1678: // is not defined
1679: private void waitForReleaseSavepoint(ReleaseSavepointTask task,
1680: String savepoint, long timeout) throws SQLException {
1681: // Check if someone had something to release
1682: if (task == null)
1683: return;
1684:
1685: long tid = task.getTransactionId();
1686:
1687: synchronized (task) {
1688: if (!task.hasCompleted())
1689: waitForTaskCompletion(timeout, "release savepoint "
1690: + savepoint + " " + tid, task);
1691:
1692: if (task.getSuccess() == 0) { // All tasks failed
1693: List exceptions = task.getExceptions();
1694: if (exceptions == null)
1695: throw new SQLException(Translate.get(
1696: "loadbalancer.releasesavepoint.all.failed",
1697: savepoint, String.valueOf(tid)));
1698: else {
1699: SQLException ex = SQLExceptionFactory
1700: .getSQLException(
1701: exceptions,
1702: Translate
1703: .get(
1704: "loadbalancer.releasesavepoint.failed.stack",
1705: savepoint,
1706: String.valueOf(tid))
1707: + "\n");
1708: logger.error(ex.getMessage());
1709: throw ex;
1710: }
1711: }
1712: }
1713: }
1714:
1715: /**
1716: * Wait for a <code>SavepointTask</code> to complete
1717: *
1718: * @param task the <code>SavepointTask</code>
1719: * @param savepoint the savepoint name
1720: * @param timeout the timeout (in milliseconds) for task completion
1721: * @throws SQLException if an error occurs while completing the savepoint task
1722: */
1723: // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1724: // FIXME the timeout is a field of the SavepointTask but the getter is not
1725: // defined
1726: // FIXME the savepoint is a field of the SavepointTask but the getter is not
1727: // defined
1728: private void waitForSavepoint(SavepointTask task, String savepoint,
1729: long timeout) throws SQLException {
1730: // Check if someone had something to release
1731: if (task == null)
1732: return;
1733:
1734: long tid = task.getTransactionId();
1735:
1736: synchronized (task) {
1737: if (!task.hasCompleted())
1738: waitForTaskCompletion(timeout, "set savepoint "
1739: + savepoint + " " + tid, task);
1740:
1741: if (task.getSuccess() == 0) { // All tasks failed
1742: List exceptions = task.getExceptions();
1743: if (exceptions == null)
1744: throw new SQLException(Translate.get(
1745: "loadbalancer.setsavepoint.all.failed",
1746: savepoint, String.valueOf(tid)));
1747: else {
1748: SQLException ex = SQLExceptionFactory
1749: .getSQLException(
1750: exceptions,
1751: Translate
1752: .get(
1753: "loadbalancer.setsavepoint.failed.stack",
1754: savepoint,
1755: String.valueOf(tid))
1756: + "\n");
1757: logger.error(ex.getMessage());
1758: throw ex;
1759: }
1760: }
1761: }
1762: }
1763:
1764: //
1765: // Utility functions
1766: //
1767: /**
1768: * Check in which queue the task should be posted and atomically posts the
1769: * task in the queue of all backends.
1770: *
1771: * @param task the task to post
1772: * @param writeList backend list to post the task to
1773: * @param removeFromTotalOrderQueue true if the query must be removed from the
1774: * total order queue
1775: */
1776: private void atomicTaskPostInQueueAndReleaseLock(
1777: AbstractRequest request, AbstractTask task, List writeList,
1778: boolean removeFromTotalOrderQueue) {
1779: synchronized (enabledBackends) {
1780: int nbOfThreads = writeList.size();
1781: for (int i = 0; i < nbOfThreads; i++) {
1782: BackendTaskQueues queues = ((DatabaseBackend) writeList
1783: .get(i)).getTaskQueues();
1784: queues.addTaskToBackendTotalOrderQueue(task);
1785: }
1786: }
1787:
1788: backendListLock.releaseRead();
1789:
1790: // Unblock next query from total order queue
1791: if (removeFromTotalOrderQueue) {
1792: removeObjectFromAndNotifyTotalOrderQueue(request);
1793: }
1794: }
1795:
1796: /**
1797: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1798: * long)
1799: */
1800: public void closePersistentConnection(String login,
1801: long persistentConnectionId) throws SQLException {
1802: /*
1803: * We assume a synchronous execution and connection closing can only come
1804: * after all requests have been executed in that connection. We post to all
1805: * backends and let the task deal with whether that backend had a persistent
1806: * connection or not.
1807: */
1808:
1809: String requestDescription = "closing persistent connection "
1810: + persistentConnectionId;
1811: int nbOfThreads = 0;
1812:
1813: DistributedClosePersistentConnection totalOrderQueueObject = null;
1814:
1815: boolean removefromTotalOrder = false;
1816: if (vdb.getTotalOrderQueue() != null) {
1817: totalOrderQueueObject = new DistributedClosePersistentConnection(
1818: login, persistentConnectionId);
1819: removefromTotalOrder = waitForTotalOrder(
1820: totalOrderQueueObject, true);
1821: }
1822:
1823: ClosePersistentConnectionTask task = null;
1824: try {
1825: nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1826: requestDescription);
1827:
1828: task = new ClosePersistentConnectionTask(
1829: getNbToWait(nbOfThreads), nbOfThreads, login,
1830: persistentConnectionId);
1831:
1832: // Post the task in the non-conflicting queues.
1833: synchronized (enabledBackends) {
1834: for (int i = 0; i < nbOfThreads; i++) {
1835: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1836: .get(i);
1837: backend.getTaskQueues()
1838: .addTaskToBackendTotalOrderQueue(task);
1839: }
1840: }
1841:
1842: // Release the lock
1843: backendListLock.releaseRead();
1844:
1845: if (removefromTotalOrder)
1846: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1847: totalOrderQueueObject = null;
1848:
1849: synchronized (task) {
1850: if (!task.hasCompleted())
1851: try {
1852: waitForTaskCompletion(0, requestDescription,
1853: task);
1854: } catch (SQLException ignore) {
1855: }
1856: }
1857: } finally {
1858: if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1859: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1860: }
1861:
1862: if (logger.isDebugEnabled())
1863: logger.debug(requestDescription + " completed on "
1864: + nbOfThreads + " backends.");
1865: }
1866: }
1867:
1868: /**
1869: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1870: * long)
1871: */
1872: public void openPersistentConnection(String login,
1873: long persistentConnectionId) throws SQLException {
1874: String requestDescription = "opening persistent connection "
1875: + persistentConnectionId;
1876: int nbOfThreads = 0;
1877:
1878: DistributedOpenPersistentConnection totalOrderQueueObject = null;
1879: if (vdb.getTotalOrderQueue() != null) {
1880: totalOrderQueueObject = new DistributedOpenPersistentConnection(
1881: login, persistentConnectionId);
1882: waitForTotalOrder(totalOrderQueueObject, true);
1883: }
1884:
1885: OpenPersistentConnectionTask task = null;
1886: try {
1887: nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1888: requestDescription);
1889:
1890: task = new OpenPersistentConnectionTask(
1891: getNbToWait(nbOfThreads), nbOfThreads, login,
1892: persistentConnectionId);
1893:
1894: // Post the task in the non-conflicting queues.
1895: synchronized (enabledBackends) {
1896: for (int i = 0; i < nbOfThreads; i++) {
1897: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1898: .get(i);
1899: backend.getTaskQueues()
1900: .addTaskToBackendTotalOrderQueue(task);
1901: }
1902: }
1903:
1904: // Release the lock
1905: backendListLock.releaseRead();
1906:
1907: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1908: totalOrderQueueObject = null;
1909:
1910: synchronized (task) {
1911: if (!task.hasCompleted())
1912: try {
1913: waitForTaskCompletion(0, requestDescription,
1914: task);
1915: } catch (SQLException ignore) {
1916: }
1917: }
1918: } finally {
1919: if (totalOrderQueueObject != null) { // NoMoreBackendException occured
1920: removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1921: }
1922:
1923: if (logger.isDebugEnabled())
1924: logger.debug(requestDescription + " completed on "
1925: + nbOfThreads + " backends.");
1926: }
1927: }
1928:
1929: /**
1930: * Enables a Backend that was previously disabled.
1931: * <p>
1932: * Ask the corresponding connection manager to initialize the connections if
1933: * needed.
1934: * <p>
1935: * No sanity checks are performed by this function.
1936: *
1937: * @param db the database backend to enable
1938: * @param writeEnabled True if the backend must be enabled for writes
1939: * @throws SQLException if an error occurs
1940: */
1941: public synchronized void enableBackend(DatabaseBackend db,
1942: boolean writeEnabled) throws SQLException {
1943: if (!db.isInitialized())
1944: db.initializeConnections();
1945:
1946: if (writeEnabled && db.isWriteCanBeEnabled()) {
1947: // Create the new backend task queues
1948: db.setTaskQueues(new BackendTaskQueues(db,
1949: waitForCompletionPolicy, this .vdb
1950: .getRequestManager()));
1951: db.startWorkerThreads(this );
1952: db.enableWrite();
1953: }
1954:
1955: db.enableRead();
1956: try {
1957: backendListLock.acquireWrite();
1958: } catch (InterruptedException e) {
1959: logger
1960: .error(
1961: "Error while acquiring write lock in enableBackend",
1962: e);
1963: }
1964: synchronized (enabledBackends) {
1965: enabledBackends.add(db);
1966: }
1967: backendListLock.releaseWrite();
1968: }
1969:
1970: /**
1971: * Disables a backend that was previously enabled.
1972: * <p>
1973: * Ask the corresponding connection manager to finalize the connections if
1974: * needed.
1975: * <p>
1976: * No sanity checks are performed by this function.
1977: *
1978: * @param db the database backend to disable
1979: * @param forceDisable true if disabling must be forced on the backend
1980: * @throws SQLException if an error occurs
1981: */
1982: public void disableBackend(DatabaseBackend db, boolean forceDisable)
1983: throws SQLException {
1984: if (!db.disable()) {
1985: // Another thread has already started the disable process
1986: return;
1987: }
1988: synchronized (this ) {
1989: try {
1990: backendListLock.acquireWrite();
1991: } catch (InterruptedException e) {
1992: logger
1993: .error(
1994: "Error while acquiring write lock in enableBackend",
1995: e);
1996: }
1997:
1998: try {
1999: synchronized (enabledBackends) {
2000: enabledBackends.remove(db);
2001: if (enabledBackends.isEmpty()) {
2002: // Cleanup schema for any remaining locks
2003: this .vdb.getRequestManager().setDatabaseSchema(
2004: null, false);
2005: }
2006: }
2007:
2008: if (!forceDisable)
2009: terminateThreadsAndConnections(db);
2010: } finally {
2011: backendListLock.releaseWrite();
2012: }
2013:
2014: if (forceDisable) {
2015: db.shutdownConnectionManagers();
2016: terminateThreadsAndConnections(db);
2017: }
2018:
2019: // sanity check on backend's active transaction
2020: if (!db.getActiveTransactions().isEmpty()) {
2021: if (logger.isWarnEnabled()) {
2022: logger.warn("Active transactions after backend "
2023: + db.getName() + " is disabled: "
2024: + db.getActiveTransactions());
2025: }
2026: }
2027:
2028: }
2029: }
2030:
2031: private void terminateThreadsAndConnections(DatabaseBackend db)
2032: throws SQLException {
2033: db.terminateWorkerThreads();
2034:
2035: if (db.isInitialized())
2036: db.finalizeConnections();
2037: }
2038:
2039: /**
2040: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2041: */
2042: public String getXmlImpl() {
2043: StringBuffer info = new StringBuffer();
2044: info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2045: if (createTablePolicy != null)
2046: info.append(createTablePolicy.getXml());
2047: if (waitForCompletionPolicy != null)
2048: info.append(waitForCompletionPolicy.getXml());
2049: if (macroHandler != null)
2050: info.append(macroHandler.getXml());
2051: this .getRaidb2Xml();
2052: info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2053: return info.toString();
2054: }
2055:
2056: /**
2057: * return xml formatted information about this raidb2 load balancer
2058: *
2059: * @return xml formatted string
2060: */
2061: public abstract String getRaidb2Xml();
2062:
2063: }
|