0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Contact: sequoia@continuent.org
0007: *
0008: * Licensed under the Apache License, Version 2.0 (the "License");
0009: * you may not use this file except in compliance with the License.
0010: * You may obtain a copy of the License at
0011: *
0012: * http://www.apache.org/licenses/LICENSE-2.0
0013: *
0014: * Unless required by applicable law or agreed to in writing, software
0015: * distributed under the License is distributed on an "AS IS" BASIS,
0016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017: * See the License for the specific language governing permissions and
0018: * limitations under the License.
0019: *
0020: * Initial developer(s): Emmanuel Cecchet.
0021: * Contributor(s): Jaco Swart, Jean-Bernard van Zuylen
0022: */package org.continuent.sequoia.controller.loadbalancer.raidb0;
0023:
0024: import java.sql.Connection;
0025: import java.sql.SQLException;
0026: import java.util.ArrayList;
0027: import java.util.Collection;
0028: import java.util.Iterator;
0029: import java.util.List;
0030:
0031: import org.continuent.sequoia.common.exceptions.BadConnectionException;
0032: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0033: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0034: import org.continuent.sequoia.common.exceptions.NotImplementedException;
0035: import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0036: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0037: import org.continuent.sequoia.common.i18n.Translate;
0038: import org.continuent.sequoia.common.log.Trace;
0039: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0040: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0041: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0042: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0043: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0044: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0045: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0046: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0047: import org.continuent.sequoia.controller.connection.PooledConnection;
0048: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0049: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0050: import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0051: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0052: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException;
0053: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
0054: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule;
0055: import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0056: import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0057: import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0058: import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0059: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0060: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0061: import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0062: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0063: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0064: import org.continuent.sequoia.controller.requests.AbstractRequest;
0065: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0066: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0067: import org.continuent.sequoia.controller.requests.SelectRequest;
0068: import org.continuent.sequoia.controller.requests.StoredProcedure;
0069: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0070:
0071: /**
0072: * RAIDb-0: database partitioning.
0073: * <p>
0074: * The requests are sent to the backend nodes hosting the tables needed to
0075: * execute the request. If no backend has the needed tables to perform a
0076: * request, it will fail.
0077: *
0078: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0079: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
0080: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0081: * </a>
0082: * @version 1.0
0083: */
0084: public class RAIDb0 extends AbstractLoadBalancer {
0085: //
0086: // How the code is organized?
0087: // 1. Member variables
0088: // 2. Constructor(s)
0089: // 3. Request handling
0090: // 4. Transaction handling
0091: // 5. Backend management
0092: // 6. Debug/Monitoring
0093: //
0094:
0095: private CreateTablePolicy createTablePolicy;
0096: protected static Trace logger = Trace
0097: .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb0");
0098:
0099: /*
0100: * Constructors
0101: */
0102:
0103: /**
0104: * Creates a new RAIDb-0 request load balancer.
0105: *
0106: * @param vdb the virtual database this load balancer belongs to.
0107: * @param createTablePolicy the policy defining how 'create table' statements
0108: * should be handled
0109: * @throws Exception if an error occurs
0110: */
0111: public RAIDb0(VirtualDatabase vdb,
0112: CreateTablePolicy createTablePolicy) throws Exception {
0113: super (vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE);
0114: this .createTablePolicy = createTablePolicy;
0115: this .waitForCompletionPolicy = new WaitForCompletionPolicy(
0116: WaitForCompletionPolicy.ALL, false, 0);
0117: }
0118:
0119: /*
0120: * Request Handling
0121: */
0122:
0123: /**
0124: * Performs a read request on the backend that has the needed tables to
0125: * executes the request.
0126: *
0127: * @param request an <code>SelectRequest</code>
0128: * @param metadataCache the metadataCache if any or null
0129: * @return the corresponding <code>java.sql.ResultSet</code>
0130: * @exception SQLException if an error occurs
0131: * @see AbstractLoadBalancer#statementExecuteQuery(SelectRequest,
0132: * MetadataCache)
0133: */
0134: public ControllerResultSet statementExecuteQuery(
0135: SelectRequest request, MetadataCache metadataCache)
0136: throws SQLException {
0137: try {
0138: vdb.acquireReadLockBackendLists(); // Acquire read lock
0139: } catch (InterruptedException e) {
0140: String msg = Translate.get(
0141: "loadbalancer.backendlist.acquire.readlock.failed",
0142: e);
0143: logger.error(msg);
0144: throw new SQLException(msg);
0145: }
0146:
0147: try {
0148: ControllerResultSet rs = null;
0149: Collection fromTables = request.getFrom();
0150:
0151: if (fromTables == null)
0152: throw new SQLException(Translate.get(
0153: "loadbalancer.from.not.found", request
0154: .getSqlShortForm(vdb
0155: .getSqlShortFormLength())));
0156:
0157: // Find the backend that has the needed tables
0158: ArrayList backends = vdb.getBackends();
0159: int size = backends.size();
0160: int enabledBackends = 0;
0161:
0162: DatabaseBackend backend = null;
0163: // The backend that will execute the query
0164: for (int i = 0; i < size; i++) {
0165: backend = (DatabaseBackend) backends.get(i);
0166: if (backend.isReadEnabled())
0167: enabledBackends++;
0168: if (backend.isReadEnabled()
0169: && backend.hasTables(fromTables))
0170: break;
0171: else
0172: backend = null;
0173: }
0174:
0175: if (backend == null) {
0176: if (enabledBackends == 0)
0177: throw new NoMoreBackendException(Translate.get(
0178: "loadbalancer.execute.no.backend.enabled",
0179: request.getId()));
0180: else
0181: throw new SQLException(Translate.get(
0182: "loadbalancer.backend.no.required.tables",
0183: fromTables.toString()));
0184: }
0185:
0186: if (logger.isDebugEnabled()) {
0187: logger.debug("Backend " + backend.getName()
0188: + " has all tables which are:");
0189: for (Iterator iter = fromTables.iterator(); iter
0190: .hasNext();)
0191: logger.debug(iter.next());
0192: }
0193:
0194: // Execute the request on the chosen backend
0195: try {
0196: rs = executeRequestOnBackend(request, backend,
0197: metadataCache);
0198: } catch (SQLException se) {
0199: String msg = Translate.get(
0200: "loadbalancer.request.failed", new String[] {
0201: String.valueOf(request.getId()),
0202: se.getMessage() });
0203: if (logger.isInfoEnabled())
0204: logger.info(msg);
0205: throw new SQLException(msg);
0206: }
0207:
0208: return rs;
0209: } catch (RuntimeException e) {
0210: String msg = Translate.get("loadbalancer.request.failed",
0211: new String[] {
0212: request.getSqlShortForm(vdb
0213: .getSqlShortFormLength()),
0214: e.getMessage() });
0215: logger.fatal(msg, e);
0216: throw new SQLException(msg);
0217: } finally {
0218: vdb.releaseReadLockBackendLists(); // Release the lock
0219: }
0220: }
0221:
0222: /**
0223: * Performs a write request on the backend that has the needed tables to
0224: * executes the request.
0225: *
0226: * @param request an <code>AbstractWriteRequest</code>
0227: * @return number of rows affected by the request
0228: * @exception SQLException if an error occurs
0229: */
0230: public ExecuteUpdateResult statementExecuteUpdate(
0231: AbstractWriteRequest request) throws SQLException {
0232: // Handle macros
0233: handleMacros(request);
0234:
0235: try {
0236: vdb.acquireReadLockBackendLists(); // Acquire read lock
0237: } catch (InterruptedException e) {
0238: String msg = Translate.get(
0239: "loadbalancer.backendlist.acquire.readlock.failed",
0240: e);
0241: logger.error(msg);
0242: throw new SQLException(msg);
0243: }
0244:
0245: boolean success = false;
0246: try {
0247: // Log lazy begin if needed
0248: if (request.isLazyTransactionStart())
0249: this .vdb.getRequestManager().logLazyTransactionBegin(
0250: request.getTransactionId());
0251:
0252: // Log request
0253: if (recoveryLog != null)
0254: recoveryLog.logRequestExecuting(request);
0255:
0256: String table = request.getTableName();
0257: AbstractConnectionManager cm = null;
0258:
0259: if (table == null)
0260: throw new SQLException(Translate.get(
0261: "loadbalancer.request.target.table.not.found",
0262: request.getSqlShortForm(vdb
0263: .getSqlShortFormLength())));
0264:
0265: // Find the backend that has the needed table
0266: ArrayList backends = vdb.getBackends();
0267: int size = backends.size();
0268:
0269: DatabaseBackend backend = null;
0270: // The backend that will execute the query
0271: if (request.isCreate()) { // Choose the backend according to the defined policy
0272: CreateTableRule rule = createTablePolicy
0273: .getTableRule(request.getTableName());
0274: if (rule == null)
0275: rule = createTablePolicy.getDefaultRule();
0276:
0277: // Ask the rule to pickup a backend
0278: ArrayList choosen;
0279: try {
0280: choosen = rule.getBackends(backends);
0281: } catch (CreateTableException e) {
0282: throw new SQLException(Translate.get(
0283: "loadbalancer.create.table.rule.failed", e
0284: .getMessage()));
0285: }
0286:
0287: // Get the connection manager from the chosen backend
0288: if (choosen != null)
0289: backend = (DatabaseBackend) choosen.get(0);
0290: if (backend != null)
0291: cm = backend.getConnectionManager(request
0292: .getLogin());
0293: } else { // Find the backend that has the table
0294: for (int i = 0; i < size; i++) {
0295: backend = (DatabaseBackend) backends.get(i);
0296: if ((backend.isWriteEnabled() || backend
0297: .isDisabling())
0298: && backend.hasTable(table)) {
0299: cm = backend.getConnectionManager(request
0300: .getLogin());
0301: break;
0302: }
0303: }
0304: }
0305:
0306: // Sanity check
0307: if (cm == null)
0308: throw new SQLException(Translate
0309: .get("loadbalancer.backend.no.required.table",
0310: table));
0311:
0312: // Ok, let's execute the query
0313:
0314: if (request.isAutoCommit()) {
0315: // We do not execute request outside the already open transactions if we
0316: // are disabling the backend.
0317: if (!backend.canAcceptTasks(request))
0318: throw new SQLException(Translate.get(
0319: "loadbalancer.backend.is.disabling",
0320: new String[] {
0321: request.getSqlShortForm(vdb
0322: .getSqlShortFormLength()),
0323: backend.getName() }));
0324:
0325: // Use a connection just for this request
0326: PooledConnection c = null;
0327: try {
0328: c = cm.retrieveConnectionInAutoCommit(request);
0329: } catch (UnreachableBackendException e1) {
0330: logger
0331: .error(Translate
0332: .get(
0333: "loadbalancer.backend.disabling.unreachable",
0334: backend.getName()));
0335: disableBackend(backend, true);
0336: throw new SQLException(Translate.get(
0337: "loadbalancer.backend.unreacheable",
0338: backend.getName()));
0339: }
0340:
0341: // Sanity check
0342: if (c == null)
0343: throw new SQLException(Translate.get(
0344: "loadbalancer.backend.no.connection",
0345: backend.getName()));
0346:
0347: ExecuteUpdateResult result;
0348: try {
0349: result = executeStatementExecuteUpdateOnBackend(
0350: request, backend, null, c);
0351: } catch (Exception e) {
0352: throw new SQLException(Translate.get(
0353: "loadbalancer.request.failed",
0354: new String[] {
0355: request.getSqlShortForm(vdb
0356: .getSqlShortFormLength()),
0357: e.getMessage() }));
0358: } finally {
0359: cm.releaseConnectionInAutoCommit(request, c);
0360: }
0361: if (logger.isDebugEnabled())
0362: logger.debug(Translate.get(
0363: "loadbalancer.execute.on", new String[] {
0364: String.valueOf(request.getId()),
0365: backend.getName() }));
0366: return result;
0367: } else { // Inside a transaction
0368: Connection c;
0369: long tid = request.getTransactionId();
0370:
0371: try {
0372: c = backend
0373: .getConnectionForTransactionAndLazyBeginIfNeeded(
0374: request, cm);
0375: } catch (UnreachableBackendException e1) {
0376: logger
0377: .error(Translate
0378: .get(
0379: "loadbalancer.backend.disabling.unreachable",
0380: backend.getName()));
0381: disableBackend(backend, true);
0382: throw new SQLException(Translate.get(
0383: "loadbalancer.backend.unreacheable",
0384: backend.getName()));
0385: } catch (NoTransactionStartWhenDisablingException e) {
0386: String msg = Translate.get(
0387: "loadbalancer.backend.is.disabling",
0388: new String[] {
0389: request.getSqlShortForm(vdb
0390: .getSqlShortFormLength()),
0391: backend.getName() });
0392: logger.error(msg);
0393: throw new SQLException(msg);
0394: }
0395:
0396: // Sanity check
0397: if (c == null)
0398: throw new SQLException(Translate.get(
0399: "loadbalancer.unable.retrieve.connection",
0400: new String[] { String.valueOf(tid),
0401: backend.getName() }));
0402:
0403: // Execute the query
0404: ExecuteUpdateResult result;
0405: try {
0406: result = executeStatementExecuteUpdateOnBackend(
0407: request, backend, null,
0408: cm.retrieveConnectionForTransaction(tid));
0409: } catch (Exception e) {
0410: throw new SQLException(Translate.get(
0411: "loadbalancer.request.failed",
0412: new String[] {
0413: request.getSqlShortForm(vdb
0414: .getSqlShortFormLength()),
0415: e.getMessage() }));
0416: }
0417: if (logger.isDebugEnabled())
0418: logger.debug(Translate.get(
0419: "loadbalancer.execute.on", new String[] {
0420: String.valueOf(request.getId()),
0421: backend.getName() }));
0422: success = true;
0423: return result;
0424: }
0425: } catch (RuntimeException e) {
0426: String msg = Translate.get("loadbalancer.request.failed",
0427: new String[] {
0428: request.getSqlShortForm(vdb
0429: .getSqlShortFormLength()),
0430: e.getMessage() });
0431: logger.fatal(msg, e);
0432: throw new SQLException(msg);
0433: } finally {
0434: vdb.releaseReadLockBackendLists(); // Release the lock
0435: if (!success)
0436: recoveryLog.logRequestCompletion(request.getLogId(),
0437: false, request.getExecTimeInMs());
0438: }
0439: }
0440:
0441: /**
0442: * @see AbstractLoadBalancer#statementExecuteUpdateWithKeys(AbstractWriteRequest,
0443: * MetadataCache)
0444: */
0445: public GeneratedKeysResult statementExecuteUpdateWithKeys(
0446: AbstractWriteRequest request, MetadataCache metadataCache)
0447: throws SQLException {
0448: // Handle macros
0449: handleMacros(request);
0450:
0451: try {
0452: vdb.acquireReadLockBackendLists(); // Acquire
0453: // read
0454: // lock
0455: } catch (InterruptedException e) {
0456: String msg = Translate.get(
0457: "loadbalancer.backendlist.acquire.readlock.failed",
0458: e);
0459: logger.error(msg);
0460: throw new SQLException(msg);
0461: }
0462:
0463: boolean success = false;
0464: try {
0465: // Log lazy begin if needed
0466: if (request.isLazyTransactionStart())
0467: this .vdb.getRequestManager().logLazyTransactionBegin(
0468: request.getTransactionId());
0469:
0470: // Log request
0471: if (recoveryLog != null)
0472: recoveryLog.logRequestExecuting(request);
0473:
0474: String table = request.getTableName();
0475: AbstractConnectionManager cm = null;
0476:
0477: if (table == null)
0478: throw new SQLException(Translate.get(
0479: "loadbalancer.request.target.table.not.found",
0480: request.getSqlShortForm(vdb
0481: .getSqlShortFormLength())));
0482:
0483: // Find the backend that has the needed table
0484: ArrayList backends = vdb.getBackends();
0485: int size = backends.size();
0486:
0487: DatabaseBackend backend = null;
0488: // The backend that will execute the query
0489: if (request.isCreate()) { // Choose the backend according to the defined policy
0490: CreateTableRule rule = createTablePolicy
0491: .getTableRule(request.getTableName());
0492: if (rule == null)
0493: rule = createTablePolicy.getDefaultRule();
0494:
0495: // Ask the rule to pickup a backend
0496: ArrayList choosen;
0497: try {
0498: choosen = rule.getBackends(backends);
0499: } catch (CreateTableException e) {
0500: throw new SQLException(Translate.get(
0501: "loadbalancer.create.table.rule.failed", e
0502: .getMessage()));
0503: }
0504:
0505: // Get the connection manager from the chosen backend
0506: if (choosen != null)
0507: backend = (DatabaseBackend) choosen.get(0);
0508: if (backend != null)
0509: cm = backend.getConnectionManager(request
0510: .getLogin());
0511: } else { // Find the backend that has the table
0512: for (int i = 0; i < size; i++) {
0513: backend = (DatabaseBackend) backends.get(i);
0514: if ((backend.isWriteEnabled() || backend
0515: .isDisabling())
0516: && backend.hasTable(table)) {
0517: cm = backend.getConnectionManager(request
0518: .getLogin());
0519: break;
0520: }
0521: }
0522: }
0523:
0524: // Sanity check
0525: if (cm == null)
0526: throw new SQLException(Translate
0527: .get("loadbalancer.backend.no.required.table",
0528: table));
0529:
0530: if (!backend.getDriverCompliance()
0531: .supportGetGeneratedKeys())
0532: throw new SQLException(
0533: Translate
0534: .get(
0535: "loadbalancer.backend.autogeneratedkeys.unsupported",
0536: backend.getName()));
0537:
0538: // Ok, let's execute the query
0539:
0540: if (request.isAutoCommit()) {
0541: // We do not execute request outside the already open transactions if we
0542: // are disabling the backend.
0543: if (!backend.canAcceptTasks(request))
0544: throw new SQLException(Translate.get(
0545: "loadbalancer.backend.is.disabling",
0546: new String[] {
0547: request.getSqlShortForm(vdb
0548: .getSqlShortFormLength()),
0549: backend.getName() }));
0550:
0551: // Use a connection just for this request
0552: PooledConnection c = null;
0553: try {
0554: c = cm.retrieveConnectionInAutoCommit(request);
0555: } catch (UnreachableBackendException e1) {
0556: logger
0557: .error(Translate
0558: .get(
0559: "loadbalancer.backend.disabling.unreachable",
0560: backend.getName()));
0561: disableBackend(backend, true);
0562: throw new SQLException(Translate.get(
0563: "loadbalancer.backend.unreacheable",
0564: backend.getName()));
0565: }
0566:
0567: // Sanity check
0568: if (c == null)
0569: throw new SQLException(Translate.get(
0570: "loadbalancer.backend.no.connection",
0571: backend.getName()));
0572:
0573: // Execute Query
0574: GeneratedKeysResult result;
0575: try {
0576: result = executeStatementExecuteUpdateWithKeysOnBackend(
0577: request, backend, null, c, metadataCache);
0578: } catch (Exception e) {
0579: throw new SQLException(Translate.get(
0580: "loadbalancer.request.failed",
0581: new String[] {
0582: request.getSqlShortForm(vdb
0583: .getSqlShortFormLength()),
0584: e.getMessage() }));
0585: } finally {
0586: backend.removePendingRequest(request);
0587: cm.releaseConnectionInAutoCommit(request, c);
0588: }
0589: if (logger.isDebugEnabled())
0590: logger.debug(Translate.get(
0591: "loadbalancer.execute.on", new String[] {
0592: String.valueOf(request.getId()),
0593: backend.getName() }));
0594: return result;
0595: } else { // Inside a transaction
0596: Connection c;
0597: long tid = request.getTransactionId();
0598:
0599: try {
0600: c = backend
0601: .getConnectionForTransactionAndLazyBeginIfNeeded(
0602: request, cm);
0603: } catch (UnreachableBackendException e1) {
0604: logger
0605: .error(Translate
0606: .get(
0607: "loadbalancer.backend.disabling.unreachable",
0608: backend.getName()));
0609: disableBackend(backend, true);
0610: throw new SQLException(Translate.get(
0611: "loadbalancer.backend.unreacheable",
0612: backend.getName()));
0613: } catch (NoTransactionStartWhenDisablingException e) {
0614: String msg = Translate.get(
0615: "loadbalancer.backend.is.disabling",
0616: new String[] {
0617: request.getSqlShortForm(vdb
0618: .getSqlShortFormLength()),
0619: backend.getName() });
0620: logger.error(msg);
0621: throw new SQLException(msg);
0622: }
0623:
0624: // Sanity check
0625: if (c == null)
0626: throw new SQLException(Translate.get(
0627: "loadbalancer.unable.retrieve.connection",
0628: new String[] { String.valueOf(tid),
0629: backend.getName() }));
0630:
0631: // Execute the query
0632: GeneratedKeysResult result;
0633: try {
0634: result = executeStatementExecuteUpdateWithKeysOnBackend(
0635: request, backend, null,
0636: cm.retrieveConnectionForTransaction(tid),
0637: metadataCache);
0638: } catch (Exception e) {
0639: throw new SQLException(Translate.get(
0640: "loadbalancer.request.failed",
0641: new String[] {
0642: request.getSqlShortForm(vdb
0643: .getSqlShortFormLength()),
0644: e.getMessage() }));
0645: }
0646: if (logger.isDebugEnabled())
0647: logger.debug(Translate.get(
0648: "loadbalancer.execute.on", new String[] {
0649: String.valueOf(request.getId()),
0650: backend.getName() }));
0651: success = true;
0652: return result;
0653: }
0654: } catch (RuntimeException e) {
0655: String msg = Translate.get("loadbalancer.request.failed",
0656: new String[] {
0657: request.getSqlShortForm(vdb
0658: .getSqlShortFormLength()),
0659: e.getMessage() });
0660: logger.fatal(msg, e);
0661: throw new SQLException(msg);
0662: } finally {
0663: vdb.releaseReadLockBackendLists(); // Release the lock
0664: if (!success)
0665: recoveryLog.logRequestCompletion(request.getLogId(),
0666: false, request.getExecTimeInMs());
0667: }
0668: }
0669:
0670: /**
0671: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0672: * MetadataCache)
0673: */
0674: public ExecuteResult statementExecute(AbstractRequest request,
0675: MetadataCache metadataCache) throws SQLException,
0676: AllBackendsFailedException {
0677: throw new NotImplementedException(
0678: "Statement.execute() is currently not supported with RAIDb-0");
0679: }
0680:
0681: /**
0682: * Execute a read request on the selected backend.
0683: *
0684: * @param request the request to execute
0685: * @param backend the backend that will execute the request
0686: * @param metadataCache the metadataCache if any or null
0687: * @return the ControllerResultSet if (!success)
0688: * recoveryLog.logRequestCompletion(request.getLogId(), false,
0689: * request.getExecTimeInMs());
0690: * @throws SQLException if an error occurs
0691: */
0692: protected ControllerResultSet executeRequestOnBackend(
0693: SelectRequest request, DatabaseBackend backend,
0694: MetadataCache metadataCache) throws SQLException {
0695: // Handle macros
0696: handleMacros(request);
0697:
0698: // Ok, we have a backend, let's execute the request
0699: AbstractConnectionManager cm = backend
0700: .getConnectionManager(request.getLogin());
0701:
0702: // Sanity check
0703: if (cm == null) {
0704: String msg = Translate.get(
0705: "loadbalancer.connectionmanager.not.found",
0706: new String[] { request.getLogin(),
0707: backend.getName() });
0708: logger.error(msg);
0709: throw new SQLException(msg);
0710: }
0711:
0712: // Execute the query
0713: if (request.isAutoCommit()) {
0714: ControllerResultSet rs = null;
0715: boolean badConnection;
0716: do {
0717: badConnection = false;
0718: // Use a connection just for this request
0719: PooledConnection c = null;
0720: try {
0721: c = cm.retrieveConnectionInAutoCommit(request);
0722: } catch (UnreachableBackendException e1) {
0723: logger
0724: .error(Translate
0725: .get(
0726: "loadbalancer.backend.disabling.unreachable",
0727: backend.getName()));
0728: disableBackend(backend, true);
0729: throw new SQLException(Translate.get(
0730: "loadbalancer.backend.unreacheable",
0731: backend.getName()));
0732: }
0733:
0734: // Sanity check
0735: if (c == null)
0736: throw new SQLException(Translate.get(
0737: "loadbalancer.backend.no.connection",
0738: backend.getName()));
0739:
0740: // Execute Query
0741: try {
0742: rs = executeStatementExecuteQueryOnBackend(request,
0743: backend, null, c.getConnection(),
0744: metadataCache);
0745: cm.releaseConnectionInAutoCommit(request, c);
0746: } catch (BadConnectionException e) { // Get rid of the bad connection
0747: cm.deleteConnection(c);
0748: badConnection = true;
0749: } catch (Throwable e) {
0750: cm.releaseConnectionInAutoCommit(request, c);
0751: throw new SQLException(
0752: Translate
0753: .get(
0754: "loadbalancer.request.failed.on.backend",
0755: new String[] {
0756: request
0757: .getSqlShortForm(vdb
0758: .getSqlShortFormLength()),
0759: backend.getName(),
0760: e.getMessage() }));
0761: }
0762: } while (badConnection);
0763: if (logger.isDebugEnabled())
0764: logger.debug(Translate.get("loadbalancer.execute.on",
0765: new String[] { String.valueOf(request.getId()),
0766: backend.getName() }));
0767: return rs;
0768: } else { // Inside a transaction
0769: Connection c;
0770: long tid = request.getTransactionId();
0771:
0772: try {
0773: c = backend
0774: .getConnectionForTransactionAndLazyBeginIfNeeded(
0775: request, cm);
0776: } catch (UnreachableBackendException e1) {
0777: logger.error(Translate.get(
0778: "loadbalancer.backend.disabling.unreachable",
0779: backend.getName()));
0780: disableBackend(backend, true);
0781: throw new SQLException(Translate.get(
0782: "loadbalancer.backend.unreacheable", backend
0783: .getName()));
0784: } catch (NoTransactionStartWhenDisablingException e) {
0785: String msg = Translate.get(
0786: "loadbalancer.backend.is.disabling",
0787: new String[] {
0788: request.getSqlShortForm(vdb
0789: .getSqlShortFormLength()),
0790: backend.getName() });
0791: logger.error(msg);
0792: throw new SQLException(msg);
0793: }
0794:
0795: // Sanity check
0796: if (c == null)
0797: throw new SQLException(Translate.get(
0798: "loadbalancer.unable.retrieve.connection",
0799: new String[] { String.valueOf(tid),
0800: backend.getName() }));
0801:
0802: // Execute Query
0803: ControllerResultSet rs = null;
0804: try {
0805: rs = executeStatementExecuteQueryOnBackend(request,
0806: backend, null, c, metadataCache);
0807: } catch (BadConnectionException e) { // Get rid of the bad connection
0808: cm.deleteConnection(tid);
0809: throw new SQLException(Translate.get(
0810: "loadbalancer.connection.failed", new String[] {
0811: String.valueOf(tid), backend.getName(),
0812: e.getMessage() }));
0813: } catch (Throwable e) {
0814: throw new SQLException(Translate.get(
0815: "loadbalancer.request.failed.on.backend",
0816: new String[] {
0817: request.getSqlShortForm(vdb
0818: .getSqlShortFormLength()),
0819: backend.getName(), e.getMessage() }));
0820: }
0821: if (logger.isDebugEnabled())
0822: logger.debug(Translate.get(
0823: "loadbalancer.execute.transaction.on",
0824: new String[] { String.valueOf(tid),
0825: String.valueOf(request.getId()),
0826: backend.getName() }));
0827: return rs;
0828: }
0829: }
0830:
0831: /**
0832: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
0833: * MetadataCache)
0834: */
0835: public ControllerResultSet readOnlyCallableStatementExecuteQuery(
0836: StoredProcedure proc, MetadataCache metadataCache)
0837: throws SQLException {
0838: throw new SQLException(
0839: "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0840: }
0841:
0842: /**
0843: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
0844: * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
0845: */
0846: public ExecuteResult readOnlyCallableStatementExecute(
0847: StoredProcedure proc, MetadataCache metadataCache)
0848: throws SQLException {
0849: throw new SQLException(
0850: "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0851: }
0852:
0853: /**
0854: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0855: * MetadataCache)
0856: */
0857: public ControllerResultSet callableStatementExecuteQuery(
0858: StoredProcedure proc, MetadataCache metadataCache)
0859: throws SQLException {
0860: throw new SQLException(
0861: "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0862: }
0863:
0864: /**
0865: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0866: */
0867: public ExecuteUpdateResult callableStatementExecuteUpdate(
0868: StoredProcedure proc) throws SQLException {
0869: throw new SQLException(
0870: "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0871: }
0872:
0873: /**
0874: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
0875: * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
0876: */
0877: public ExecuteResult callableStatementExecute(StoredProcedure proc,
0878: MetadataCache metadataCache)
0879: throws AllBackendsFailedException, SQLException {
0880: throw new SQLException(
0881: "Stored procedure calls are not supported with RAIDb-0 load balancers.");
0882: }
0883:
0884: /**
0885: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
0886: */
0887: public ControllerResultSet getPreparedStatementGetMetaData(
0888: AbstractRequest request) throws SQLException {
0889: // Choose a backend
0890: try {
0891: vdb.acquireReadLockBackendLists();
0892: } catch (InterruptedException e) {
0893: String msg = Translate.get(
0894: "loadbalancer.backendlist.acquire.readlock.failed",
0895: e);
0896: logger.error(msg);
0897: throw new SQLException(msg);
0898: }
0899:
0900: /*
0901: * The backend that will execute the query
0902: */
0903: DatabaseBackend backend = null;
0904:
0905: // Note that vdb lock is released in the finally clause of this try/catch
0906: // block
0907: try {
0908: ArrayList backends = vdb.getBackends();
0909: int size = backends.size();
0910:
0911: if (size == 0)
0912: throw new NoMoreBackendException(Translate.get(
0913: "loadbalancer.execute.no.backend.available",
0914: request.getId()));
0915:
0916: // Choose the first available backend
0917: for (int i = 0; i < size; i++) {
0918: DatabaseBackend b = (DatabaseBackend) backends.get(i);
0919: if (b.isReadEnabled()) {
0920: backend = b;
0921: break;
0922: }
0923: }
0924: } catch (Throwable e) {
0925: String msg = Translate.get(
0926: "loadbalancer.execute.find.backend.failed",
0927: new String[] {
0928: request.getSqlShortForm(vdb
0929: .getSqlShortFormLength()),
0930: e.getMessage() });
0931: logger.error(msg, e);
0932: throw new SQLException(msg);
0933: } finally {
0934: vdb.releaseReadLockBackendLists();
0935: }
0936:
0937: if (backend == null)
0938: throw new NoMoreBackendException(Translate.get(
0939: "loadbalancer.execute.no.backend.enabled", request
0940: .getId()));
0941:
0942: // Ok, we have a backend, let's execute the request
0943: AbstractConnectionManager cm = backend
0944: .getConnectionManager(request.getLogin());
0945:
0946: // Sanity check
0947: if (cm == null) {
0948: String msg = Translate.get(
0949: "loadbalancer.connectionmanager.not.found",
0950: new String[] { request.getLogin(),
0951: backend.getName() });
0952: logger.error(msg);
0953: throw new SQLException(msg);
0954: }
0955:
0956: // Execute the query
0957: if (request.isAutoCommit()) {
0958: ControllerResultSet rs = null;
0959: boolean badConnection;
0960: do {
0961: badConnection = false;
0962: // Use a connection just for this request
0963: PooledConnection c = null;
0964: try {
0965: c = cm.retrieveConnectionInAutoCommit(request);
0966: } catch (UnreachableBackendException e1) {
0967: logger
0968: .error(Translate
0969: .get(
0970: "loadbalancer.backend.disabling.unreachable",
0971: backend.getName()));
0972: disableBackend(backend, true);
0973: // Retry on a different backend
0974: return getPreparedStatementGetMetaData(request);
0975: }
0976:
0977: // Sanity check
0978: if (c == null)
0979: throw new SQLException(Translate.get(
0980: "loadbalancer.backend.no.connection",
0981: backend.getName()));
0982:
0983: // Execute Query
0984: try {
0985: rs = preparedStatementGetMetaDataOnBackend(request
0986: .getSqlOrTemplate(), backend, c
0987: .getConnection());
0988: cm.releaseConnectionInAutoCommit(request, c);
0989: } catch (SQLException e) {
0990: cm.releaseConnectionInAutoCommit(request, c);
0991: throw SQLExceptionFactory
0992: .getSQLException(
0993: e,
0994: Translate
0995: .get(
0996: "loadbalancer.request.failed.on.backend",
0997: new String[] {
0998: request
0999: .getSqlShortForm(vdb
1000: .getSqlShortFormLength()),
1001: backend
1002: .getName(),
1003: e
1004: .getMessage() }));
1005: } catch (BadConnectionException e) { // Get rid of the bad connection
1006: cm.deleteConnection(c);
1007: badConnection = true;
1008: } catch (Throwable e) {
1009: cm.releaseConnectionInAutoCommit(request, c);
1010: throw new SQLException(
1011: Translate
1012: .get(
1013: "loadbalancer.request.failed.on.backend",
1014: new String[] {
1015: request
1016: .getSqlShortForm(vdb
1017: .getSqlShortFormLength()),
1018: backend.getName(),
1019: e.getMessage() }));
1020: }
1021: } while (badConnection);
1022: if (logger.isDebugEnabled())
1023: logger.debug(Translate.get("loadbalancer.execute.on",
1024: new String[] { String.valueOf(request.getId()),
1025: backend.getName() }));
1026: return rs;
1027: } else { // Inside a transaction
1028: Connection c;
1029: long tid = request.getTransactionId();
1030:
1031: try {
1032: c = backend
1033: .getConnectionForTransactionAndLazyBeginIfNeeded(
1034: request, cm);
1035: } catch (UnreachableBackendException e1) {
1036: logger.error(Translate.get(
1037: "loadbalancer.backend.disabling.unreachable",
1038: backend.getName()));
1039: disableBackend(backend, true);
1040: throw new SQLException(Translate.get(
1041: "loadbalancer.backend.unreacheable", backend
1042: .getName()));
1043: } catch (NoTransactionStartWhenDisablingException e) {
1044: String msg = Translate.get(
1045: "loadbalancer.backend.is.disabling",
1046: new String[] {
1047: request.getSqlShortForm(vdb
1048: .getSqlShortFormLength()),
1049: backend.getName() });
1050: logger.error(msg);
1051: throw new SQLException(msg);
1052: }
1053:
1054: // Sanity check
1055: if (c == null)
1056: throw new SQLException(Translate.get(
1057: "loadbalancer.unable.retrieve.connection",
1058: new String[] { String.valueOf(tid),
1059: backend.getName() }));
1060:
1061: // Execute Query
1062: ControllerResultSet rs = null;
1063: try {
1064: rs = preparedStatementGetMetaDataOnBackend(request
1065: .getSqlOrTemplate(), backend, c);
1066: } catch (SQLException e) {
1067: throw e;
1068: } catch (BadConnectionException e) { // Connection failed, so did the transaction
1069: // Disable the backend.
1070: cm.deleteConnection(tid);
1071: String msg = Translate
1072: .get(
1073: "loadbalancer.backend.disabling.connection.failure",
1074: backend.getName());
1075: logger.error(msg);
1076: disableBackend(backend, true);
1077: throw new SQLException(msg);
1078: } catch (Throwable e) {
1079: throw new SQLException(Translate.get(
1080: "loadbalancer.request.failed.on.backend",
1081: new String[] {
1082: request.getSqlShortForm(vdb
1083: .getSqlShortFormLength()),
1084: backend.getName(), e.getMessage() }));
1085: }
1086: if (logger.isDebugEnabled())
1087: logger.debug(Translate.get(
1088: "loadbalancer.execute.transaction.on",
1089: new String[] { String.valueOf(tid),
1090: String.valueOf(request.getId()),
1091: backend.getName() }));
1092: return rs;
1093: }
1094: }
1095:
1096: /*
1097: * Transaction management
1098: */
1099:
1100: /**
1101: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1102: */
1103: public void abort(TransactionMetaData tm) throws SQLException {
1104: rollback(tm);
1105: }
1106:
1107: /**
1108: * Begins a new transaction.
1109: *
1110: * @param tm the transaction marker metadata
1111: * @throws SQLException if an error occurs
1112: */
1113: public final void begin(TransactionMetaData tm) throws SQLException {
1114: }
1115:
1116: /**
1117: * Commits a transaction.
1118: *
1119: * @param tm the transaction marker metadata
1120: * @throws SQLException if an error occurs
1121: */
1122: public void commit(TransactionMetaData tm) throws SQLException {
1123: long tid = tm.getTransactionId();
1124: Long lTid = new Long(tid);
1125:
1126: long logId = 0;
1127: // Log the request
1128: if (recoveryLog != null)
1129: logId = recoveryLog.logCommit(tm);
1130:
1131: // Acquire the lock
1132: String requestDescription = "commit " + tid;
1133: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1134: requestDescription);
1135:
1136: // Build the list of backends that need to commit this transaction
1137: ArrayList commitList = new ArrayList(nbOfThreads);
1138: for (int i = 0; i < nbOfThreads; i++) {
1139: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1140: .get(i);
1141: if (backend.isStartedTransaction(lTid))
1142: commitList.add(backend);
1143: }
1144:
1145: int nbOfThreadsToCommit = commitList.size();
1146: CommitTask task = null;
1147: if (nbOfThreadsToCommit != 0)
1148: task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1149: nbOfThreadsToCommit, tm);
1150:
1151: // Post the task in the non-conflicting queues.
1152: synchronized (enabledBackends) {
1153: for (int i = 0; i < nbOfThreadsToCommit; i++) {
1154: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1155: .get(i);
1156: backend.getTaskQueues()
1157: .addTaskToBackendTotalOrderQueue(task);
1158: }
1159: }
1160:
1161: // Release the lock
1162: backendListLock.releaseRead();
1163:
1164: // Check if someone had something to commit
1165: if (task == null)
1166: return;
1167:
1168: synchronized (task) {
1169: if (!task.hasCompleted())
1170: waitForTaskCompletion(tm.getTimeout(),
1171: requestDescription, task);
1172:
1173: if (task.getSuccess() == 0) { // All tasks failed
1174:
1175: // Notify failure in recovery log
1176: if (recoveryLog != null)
1177: recoveryLog.logRequestCompletion(logId, false, 0);
1178:
1179: List exceptions = task.getExceptions();
1180: if (exceptions == null)
1181: throw new SQLException(Translate.get(
1182: "loadbalancer.commit.all.failed", tid));
1183: else {
1184: String errorMsg = Translate.get(
1185: "loadbalancer.commit.failed.stack", tid)
1186: + "\n";
1187: SQLException ex = SQLExceptionFactory
1188: .getSQLException(exceptions, errorMsg);
1189: logger.error(ex.getMessage());
1190: throw ex;
1191: }
1192: }
1193: }
1194: }
1195:
1196: /**
1197: * Rollbacks a transaction.
1198: *
1199: * @param tm the transaction marker metadata
1200: * @throws SQLException if an error occurs
1201: */
1202: public void rollback(TransactionMetaData tm) throws SQLException {
1203: long tid = tm.getTransactionId();
1204: Long lTid = new Long(tid);
1205:
1206: long logId = 0;
1207: // Log the request
1208: if (recoveryLog != null)
1209: logId = recoveryLog.logRollback(tm);
1210:
1211: // Acquire the lock
1212: String requestDescription = "rollback " + tid;
1213: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1214: requestDescription);
1215:
1216: // Build the list of backends that need to rollback this transaction
1217: ArrayList rollbackList = new ArrayList();
1218: for (int i = 0; i < nbOfThreads; i++) {
1219: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1220: .get(i);
1221: if (backend.isStartedTransaction(lTid))
1222: rollbackList.add(backend);
1223: }
1224:
1225: int nbOfThreadsToRollback = rollbackList.size();
1226: RollbackTask task = null;
1227: if (nbOfThreadsToRollback != 0)
1228: task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1229: nbOfThreadsToRollback, tm);
1230:
1231: // Post the task in the non-conflicting queues.
1232: synchronized (enabledBackends) {
1233: for (int i = 0; i < nbOfThreadsToRollback; i++) {
1234: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1235: .get(i);
1236: backend.getTaskQueues()
1237: .addTaskToBackendTotalOrderQueue(task);
1238: }
1239: }
1240:
1241: // Release the lock
1242: backendListLock.releaseRead();
1243:
1244: // Check if someone had something to rollback
1245: if (task == null)
1246: return;
1247:
1248: synchronized (task) {
1249: if (!task.hasCompleted())
1250: waitForTaskCompletion(tm.getTimeout(),
1251: requestDescription, task);
1252:
1253: if (task.getSuccess() == 0) { // All tasks failed
1254:
1255: // Notify failure in recovery log
1256: if (recoveryLog != null)
1257: recoveryLog.logRequestCompletion(logId, false, 0);
1258:
1259: List exceptions = task.getExceptions();
1260: if (exceptions == null)
1261: throw new SQLException(Translate.get(
1262: "loadbalancer.rollback.all.failed", tid));
1263: else {
1264: String errorMsg = Translate.get(
1265: "loadbalancer.rollback.failed.stack", tid)
1266: + "\n";
1267: SQLException ex = SQLExceptionFactory
1268: .getSQLException(exceptions, errorMsg);
1269: logger.error(ex.getMessage());
1270: throw ex;
1271: }
1272: }
1273: }
1274: }
1275:
1276: /**
1277: * Rollback a transaction to a savepoint
1278: *
1279: * @param tm The transaction marker metadata
1280: * @param savepointName The name of the savepoint
1281: * @throws SQLException if an error occurs
1282: */
1283: public void rollbackToSavepoint(TransactionMetaData tm,
1284: String savepointName) throws SQLException {
1285: long tid = tm.getTransactionId();
1286: Long lTid = new Long(tid);
1287:
1288: long logId = 0;
1289: // Log the request
1290: if (recoveryLog != null)
1291: logId = recoveryLog.logRollbackToSavepoint(tm,
1292: savepointName);
1293:
1294: // Acquire the lock
1295: String requestDescription = "rollback " + savepointName + " "
1296: + tid;
1297: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1298: requestDescription);
1299:
1300: // Build the list of backends that need to rollback this transaction
1301: ArrayList rollbackList = new ArrayList();
1302: for (int i = 0; i < nbOfThreads; i++) {
1303: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1304: .get(i);
1305: if (backend.isStartedTransaction(lTid))
1306: rollbackList.add(backend);
1307: }
1308:
1309: int nbOfThreadsToRollback = rollbackList.size();
1310: RollbackToSavepointTask task = null;
1311: if (nbOfThreadsToRollback != 0)
1312: task = new RollbackToSavepointTask(
1313: getNbToWait(nbOfThreadsToRollback),
1314: nbOfThreadsToRollback, tm, savepointName);
1315:
1316: // Post the task in the non-conflicting queues.
1317: synchronized (enabledBackends) {
1318: for (int i = 0; i < nbOfThreadsToRollback; i++) {
1319: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1320: .get(i);
1321: backend.getTaskQueues()
1322: .addTaskToBackendTotalOrderQueue(task);
1323: }
1324: }
1325:
1326: // Release the lock
1327: backendListLock.releaseRead();
1328:
1329: // Check if someone had something to rollback
1330: if (task == null)
1331: return;
1332:
1333: synchronized (task) {
1334: if (!task.hasCompleted())
1335: waitForTaskCompletion(tm.getTimeout(),
1336: requestDescription, task);
1337:
1338: if (task.getSuccess() == 0) { // All tasks failed
1339:
1340: // Notify failure in recovery log
1341: if (recoveryLog != null)
1342: recoveryLog.logRequestCompletion(logId, false, 0);
1343:
1344: List exceptions = task.getExceptions();
1345: if (exceptions == null)
1346: throw new SQLException(
1347: Translate
1348: .get(
1349: "loadbalancer.rollbacksavepoint.all.failed",
1350: new String[] {
1351: savepointName,
1352: String.valueOf(tid) }));
1353: else {
1354: String errorMsg = Translate
1355: .get(
1356: "loadbalancer.rollbacksavepoint.failed.stack",
1357: new String[] { savepointName,
1358: String.valueOf(tid) })
1359: + "\n";
1360: SQLException ex = SQLExceptionFactory
1361: .getSQLException(exceptions, errorMsg);
1362: logger.error(ex.getMessage());
1363: throw ex;
1364: }
1365: }
1366: }
1367: }
1368:
1369: /**
1370: * Release a savepoint from a transaction
1371: *
1372: * @param tm The transaction marker metadata
1373: * @param savepointName The name of the savepoint ro release
1374: * @throws SQLException if an error occurs
1375: */
1376: public void releaseSavepoint(TransactionMetaData tm,
1377: String savepointName) throws SQLException {
1378: long tid = tm.getTransactionId();
1379: Long lTid = new Long(tid);
1380:
1381: long logId = 0;
1382: // Log the request
1383: if (recoveryLog != null)
1384: logId = recoveryLog.logReleaseSavepoint(tm, savepointName);
1385:
1386: // Acquire the lock
1387: String requestDescription = "release savepoint "
1388: + savepointName + " " + tid;
1389: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1390: requestDescription);
1391:
1392: // Build the list of backends that need to rollback this transaction
1393: ArrayList savepointList = new ArrayList();
1394: for (int i = 0; i < nbOfThreads; i++) {
1395: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1396: .get(i);
1397: if (backend.isStartedTransaction(lTid))
1398: savepointList.add(backend);
1399: }
1400:
1401: int nbOfSavepoints = savepointList.size();
1402: ReleaseSavepointTask task = null;
1403: if (nbOfSavepoints != 0)
1404: task = new ReleaseSavepointTask(getNbToWait(nbOfThreads),
1405: nbOfThreads, tm, savepointName);
1406:
1407: // Post the task in the non-conflicting queues.
1408: synchronized (enabledBackends) {
1409: for (int i = 0; i < nbOfSavepoints; i++) {
1410: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1411: .get(i);
1412: backend.getTaskQueues()
1413: .addTaskToBackendTotalOrderQueue(task);
1414: }
1415: }
1416:
1417: // Release the lock
1418: backendListLock.releaseRead();
1419:
1420: // Check if someone had something to release
1421: if (task == null)
1422: return;
1423:
1424: synchronized (task) {
1425: if (!task.hasCompleted())
1426: waitForTaskCompletion(tm.getTimeout(),
1427: requestDescription, task);
1428:
1429: if (task.getSuccess() == 0) { // All tasks failed
1430:
1431: // Notify failure in recovery log
1432: if (recoveryLog != null)
1433: recoveryLog.logRequestCompletion(logId, false, 0);
1434:
1435: List exceptions = task.getExceptions();
1436: if (exceptions == null)
1437: throw new SQLException(Translate.get(
1438: "loadbalancer.releasesavepoint.all.failed",
1439: new String[] { savepointName,
1440: String.valueOf(tid) }));
1441: else {
1442: String errorMsg = Translate
1443: .get(
1444: "loadbalancer.releasesavepoint.failed.stack",
1445: new String[] { savepointName,
1446: String.valueOf(tid) })
1447: + "\n";
1448: SQLException ex = SQLExceptionFactory
1449: .getSQLException(exceptions, errorMsg);
1450: logger.error(ex.getMessage());
1451: throw ex;
1452: }
1453: }
1454: }
1455: }
1456:
1457: /**
1458: * Set a savepoint to a transaction.
1459: *
1460: * @param tm The transaction marker metadata
1461: * @param savepointName The name of the new savepoint
1462: * @throws SQLException if an error occurs
1463: */
1464: public void setSavepoint(TransactionMetaData tm,
1465: String savepointName) throws SQLException {
1466: long tid = tm.getTransactionId();
1467: Long lTid = new Long(tid);
1468:
1469: long logId = 0;
1470: // Log the request
1471: if (recoveryLog != null)
1472: logId = recoveryLog.logSetSavepoint(tm, savepointName);
1473:
1474: // Acquire the lock
1475: String requestDescription = "set savepoint " + savepointName
1476: + " " + tid;
1477: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1478: requestDescription);
1479:
1480: // Build the list of backends that need to rollback this transaction
1481: ArrayList savepointList = new ArrayList();
1482: for (int i = 0; i < nbOfThreads; i++) {
1483: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1484: .get(i);
1485: if (backend.isStartedTransaction(lTid))
1486: savepointList.add(backend);
1487: }
1488:
1489: int nbOfSavepoints = savepointList.size();
1490: SavepointTask task = null;
1491: if (nbOfSavepoints != 0)
1492: task = new SavepointTask(getNbToWait(nbOfThreads),
1493: nbOfThreads, tm, savepointName);
1494:
1495: // Post the task in the non-conflicting queues.
1496: synchronized (enabledBackends) {
1497: for (int i = 0; i < nbOfSavepoints; i++) {
1498: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1499: .get(i);
1500: backend.getTaskQueues()
1501: .addTaskToBackendTotalOrderQueue(task);
1502: }
1503: }
1504:
1505: // Release the lock
1506: backendListLock.releaseRead();
1507:
1508: // Check if someone had something to release
1509: if (task == null)
1510: return;
1511:
1512: synchronized (task) {
1513: if (!task.hasCompleted())
1514: waitForTaskCompletion(tm.getTimeout(),
1515: requestDescription, task);
1516:
1517: if (task.getSuccess() == 0) { // All tasks failed
1518:
1519: // Notify failure in recovery log
1520: if (recoveryLog != null)
1521: recoveryLog.logRequestCompletion(logId, false, 0);
1522:
1523: List exceptions = task.getExceptions();
1524: if (exceptions == null)
1525: throw new SQLException(Translate.get(
1526: "loadbalancer.setsavepoint.all.failed",
1527: new String[] { savepointName,
1528: String.valueOf(tid) }));
1529: else {
1530: String errorMsg = Translate.get(
1531: "loadbalancer.setsavepoint.failed.stack",
1532: new String[] { savepointName,
1533: String.valueOf(tid) })
1534: + "\n";
1535: SQLException ex = SQLExceptionFactory
1536: .getSQLException(exceptions, errorMsg);
1537: logger.error(ex.getMessage());
1538: throw ex;
1539: }
1540: }
1541: }
1542: }
1543:
1544: /**
1545: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1546: * long)
1547: */
1548: public void closePersistentConnection(String login,
1549: long persistentConnectionId) throws SQLException {
1550: /*
1551: * We assume a synchronous execution and connection closing can only come
1552: * after all requests have been executed in that connection. We post to all
1553: * backends and let the task deal with whether that backend had a persistent
1554: * connection or not.
1555: */
1556:
1557: String requestDescription = "closing persistent connection "
1558: + persistentConnectionId;
1559: ClosePersistentConnectionTask task = null;
1560: try {
1561: int nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1562: requestDescription);
1563:
1564: task = new ClosePersistentConnectionTask(
1565: getNbToWait(nbOfThreads), nbOfThreads, login,
1566: persistentConnectionId);
1567:
1568: // Post the task in the non-conflicting queues.
1569: synchronized (enabledBackends) {
1570: for (int i = 0; i < nbOfThreads; i++) {
1571: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1572: .get(i);
1573: backend.getTaskQueues()
1574: .addTaskToBackendTotalOrderQueue(task);
1575: }
1576: }
1577:
1578: // Release the lock
1579: backendListLock.releaseRead();
1580: } finally {
1581: if (task == null) { // Happens if we had a NoMoreBackendException
1582: task = new ClosePersistentConnectionTask(0, 0, login,
1583: persistentConnectionId);
1584: }
1585: }
1586: }
1587:
1588: /**
1589: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1590: * long)
1591: */
1592: public void openPersistentConnection(String login,
1593: long persistentConnectionId) throws SQLException {
1594: String requestDescription = "opening persistent connection "
1595: + persistentConnectionId;
1596: int nbOfThreads = 0;
1597: OpenPersistentConnectionTask task = null;
1598: try {
1599: nbOfThreads = acquireLockAndCheckNbOfThreads(null,
1600: requestDescription);
1601:
1602: task = new OpenPersistentConnectionTask(
1603: getNbToWait(nbOfThreads), nbOfThreads, login,
1604: persistentConnectionId);
1605:
1606: // Post the task in the non-conflicting queues.
1607: synchronized (enabledBackends) {
1608: for (int i = 0; i < nbOfThreads; i++) {
1609: DatabaseBackend backend = (DatabaseBackend) enabledBackends
1610: .get(i);
1611: backend.getTaskQueues()
1612: .addTaskToBackendTotalOrderQueue(task);
1613: }
1614: }
1615:
1616: // Release the lock
1617: backendListLock.releaseRead();
1618:
1619: synchronized (task) {
1620: if (!task.hasCompleted())
1621: try {
1622: waitForTaskCompletion(0, requestDescription,
1623: task);
1624: } catch (SQLException ignore) {
1625: }
1626: }
1627: } finally {
1628: if (logger.isDebugEnabled())
1629: logger.debug(requestDescription + " completed on "
1630: + nbOfThreads + " backends.");
1631: }
1632: }
1633:
1634: /**
1635: * Enables a Backend that was previously disabled.
1636: * <p>
1637: * Ask the corresponding connection manager to initialize the connections if
1638: * needed.
1639: * <p>
1640: * No sanity checks are performed by this function.
1641: *
1642: * @param db the database backend to enable
1643: * @param writeEnabled True if the backend must be enabled for writes
1644: * @throws SQLException if an error occurs
1645: */
1646: public void enableBackend(DatabaseBackend db, boolean writeEnabled)
1647: throws SQLException {
1648: if (!db.isInitialized())
1649: db.initializeConnections();
1650:
1651: if (writeEnabled && db.isWriteCanBeEnabled()) {
1652: // Create the new backend task queues
1653: db.setTaskQueues(new BackendTaskQueues(db,
1654: waitForCompletionPolicy, this .vdb
1655: .getRequestManager()));
1656: db.startWorkerThreads(this );
1657: db.enableWrite();
1658: }
1659:
1660: db.enableRead();
1661: try {
1662: backendListLock.acquireWrite();
1663: } catch (InterruptedException e) {
1664: logger
1665: .error(
1666: "Error while acquiring write lock in enableBackend",
1667: e);
1668: }
1669: synchronized (enabledBackends) {
1670: enabledBackends.add(db);
1671: }
1672: backendListLock.releaseWrite();
1673: }
1674:
1675: /**
1676: * Disables a backend that was previously enabled.
1677: * <p>
1678: * Ask the corresponding connection manager to finalize the connections if
1679: * needed.
1680: * <p>
1681: * No sanity checks are performed by this function.
1682: *
1683: * @param db the database backend to disable
1684: * @param forceDisable true if disabling must be forced on the backend
1685: * @throws SQLException if an error occurs
1686: */
1687: public void disableBackend(DatabaseBackend db, boolean forceDisable)
1688: throws SQLException {
1689: if (!db.disable()) {
1690: // Another thread has already started the disable process
1691: return;
1692: }
1693: synchronized (this ) {
1694: if (forceDisable)
1695: db.shutdownConnectionManagers();
1696: db.terminateWorkerThreads();
1697:
1698: if (db.isInitialized())
1699: db.finalizeConnections();
1700:
1701: try {
1702: backendListLock.acquireWrite();
1703: } catch (InterruptedException e) {
1704: logger
1705: .error(
1706: "Error while acquiring write lock in enableBackend",
1707: e);
1708: }
1709: synchronized (enabledBackends) {
1710: enabledBackends.remove(db);
1711: if (enabledBackends.isEmpty()) {
1712: // Cleanup schema for any remaining locks
1713: this .vdb.getRequestManager().setDatabaseSchema(
1714: null, false);
1715: }
1716: }
1717: backendListLock.releaseWrite();
1718: }
1719: }
1720:
1721: /**
1722: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#setWeight(String,
1723: * int)
1724: */
1725: public void setWeight(String name, int w) throws SQLException {
1726: throw new SQLException(
1727: "Weight is not supported with this load balancer");
1728: }
1729:
1730: /*
1731: * Debug/Monitoring
1732: */
1733:
1734: /**
1735: * Get information about the Request load balancer
1736: *
1737: * @return <code>String</code> containing information
1738: */
1739: public String getInformation() {
1740: return "RAIDb-0 Request load balancer\n";
1741: }
1742:
1743: /**
1744: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
1745: */
1746: public String getXmlImpl() {
1747: StringBuffer info = new StringBuffer();
1748: info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
1749: createTablePolicy.getXml();
1750: info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
1751: return info.toString();
1752: }
1753:
1754: }
|