0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Contact: sequoia@continuent.org
0007: *
0008: * Licensed under the Apache License, Version 2.0 (the "License");
0009: * you may not use this file except in compliance with the License.
0010: * You may obtain a copy of the License at
0011: *
0012: * http://www.apache.org/licenses/LICENSE-2.0
0013: *
0014: * Unless required by applicable law or agreed to in writing, software
0015: * distributed under the License is distributed on an "AS IS" BASIS,
0016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017: * See the License for the specific language governing permissions and
0018: * limitations under the License.
0019:
0020:
0021: * Free Software Foundation; either version 2.1 of the License, or any later
0022: * version.
0023: *
0024: * This library is distributed in the hope that it will be useful, but WITHOUT
0025: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0026: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
0027: * for more details.
0028: *
0029: * You should have received a copy of the GNU Lesser General Public License
0030: * along with this library; if not, write to the Free Software Foundation,
0031: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
0032: *
0033: * Initial developer(s): Emmanuel Cecchet.
0034: * Contributor(s): Jaco Swart, Jean-Bernard van Zuylen.
0035: */package org.continuent.sequoia.controller.loadbalancer.paralleldb;
0036:
0037: import java.sql.Connection;
0038: import java.sql.SQLException;
0039: import java.sql.Savepoint;
0040: import java.util.Hashtable;
0041: import java.util.List;
0042:
0043: import org.continuent.sequoia.common.exceptions.BadConnectionException;
0044: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0045: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0046: import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
0047: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0048: import org.continuent.sequoia.common.i18n.Translate;
0049: import org.continuent.sequoia.common.log.Trace;
0050: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0051: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0052: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0053: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0054: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0055: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0056: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0057: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0058: import org.continuent.sequoia.controller.connection.PooledConnection;
0059: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0060: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0061: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0062: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0063: import org.continuent.sequoia.controller.requests.AbstractRequest;
0064: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0065: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0066: import org.continuent.sequoia.controller.requests.SelectRequest;
0067: import org.continuent.sequoia.controller.requests.StoredProcedure;
0068: import org.continuent.sequoia.controller.requests.UnknownReadRequest;
0069: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0070:
0071: /**
0072: * This class defines a ParallelDB
0073: *
0074: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0075: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0076: * </a>
0077: * @version 1.0
0078: */
0079: /**
0080: * These are generic functions for all ParallelDB load balancers.
0081: * <p>
0082: * Read and write queries are load balanced on the backends without any
0083: * replication (assuming that the underlying parallel database takes care of
0084: * data replication). The load balancers provide failover for reads and writes.
0085: *
0086: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0087: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
0088: * @version 1.0
0089: */
0090: public abstract class ParallelDB extends AbstractLoadBalancer {
0091: // transaction id -> DatabaseBackend
0092: private Hashtable backendPerTransactionId;
0093: private int numberOfEnabledBackends = 0;
0094:
0095: static Trace endUserLogger = Trace
0096: .getLogger("org.continuent.sequoia.enduser");
0097:
0098: /**
0099: * Creates a new <code>ParallelDB</code> load balancer with NO_PARSING and a
0100: * SingleDB RAIDb level.
0101: *
0102: * @param vdb the virtual database this load balancer belongs to.
0103: * @throws SQLException if an error occurs
0104: */
0105: public ParallelDB(VirtualDatabase vdb) throws SQLException {
0106: super (vdb, RAIDbLevels.SingleDB,
0107: ParsingGranularities.NO_PARSING);
0108: backendPerTransactionId = new Hashtable();
0109: }
0110:
0111: //
0112: // Request handling
0113: //
0114:
0115: /**
0116: * Choose a backend using the implementation specific load balancing algorithm
0117: * for read request execution.
0118: *
0119: * @param request request to execute
0120: * @return the chosen backend
0121: * @throws SQLException if an error occurs
0122: */
0123: public abstract DatabaseBackend chooseBackendForReadRequest(
0124: AbstractRequest request) throws SQLException;
0125:
0126: /**
0127: * Choose a backend using the implementation specific load balancing algorithm
0128: * for write request execution.
0129: *
0130: * @param request request to execute
0131: * @return the chosen backend
0132: * @throws SQLException if an error occurs
0133: */
0134: public abstract DatabaseBackend chooseBackendForWriteRequest(
0135: AbstractWriteRequest request) throws SQLException;
0136:
0137: /**
0138: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecuteQuery(SelectRequest,
0139: * MetadataCache)
0140: */
0141: public ControllerResultSet statementExecuteQuery(
0142: SelectRequest request, MetadataCache metadataCache)
0143: throws SQLException {
0144: DatabaseBackend backend;
0145: if (request.isAutoCommit())
0146: backend = chooseBackendForReadRequest(request);
0147: else
0148: backend = (DatabaseBackend) backendPerTransactionId
0149: .get(new Long(request.getTransactionId()));
0150:
0151: if (backend == null)
0152: throw new SQLException(Translate.get(
0153: "loadbalancer.execute.no.backend.found", request
0154: .getSqlShortForm(vdb
0155: .getSqlShortFormLength())));
0156:
0157: ControllerResultSet rs = null;
0158: // Execute the request on the chosen backend
0159: try {
0160: rs = executeStatementExecuteQueryOnBackend(request,
0161: backend, metadataCache);
0162: } catch (UnreachableBackendException urbe) {
0163: // Notify failure in recovery log
0164: if (recoveryLog != null)
0165: recoveryLog.logRequestCompletion(request.getLogId(),
0166: false, request.getExecTimeInMs());
0167:
0168: // Try to execute query on different backend
0169: return statementExecuteQuery(request, metadataCache);
0170: } catch (SQLException se) {
0171: // Notify failure in recovery log
0172: if (recoveryLog != null)
0173: recoveryLog.logRequestCompletion(request.getLogId(),
0174: false, request.getExecTimeInMs());
0175:
0176: String msg = Translate.get("loadbalancer.request.failed",
0177: new String[] { String.valueOf(request.getId()),
0178: se.getMessage() });
0179: if (logger.isInfoEnabled())
0180: logger.info(msg);
0181: throw new SQLException(msg);
0182: } catch (RuntimeException e) {
0183: // Notify failure in recovery log
0184: if (recoveryLog != null)
0185: recoveryLog.logRequestCompletion(request.getLogId(),
0186: false, request.getExecTimeInMs());
0187:
0188: String msg = Translate.get(
0189: "loadbalancer.request.failed.on.backend",
0190: new String[] {
0191: request.getSqlShortForm(vdb
0192: .getSqlShortFormLength()),
0193: backend.getName(), e.getMessage() });
0194: logger.error(msg, e);
0195: throw new SQLException(msg);
0196: }
0197:
0198: return rs;
0199: }
0200:
0201: /**
0202: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecuteUpdate(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
0203: */
0204: public ExecuteUpdateResult statementExecuteUpdate(
0205: AbstractWriteRequest request)
0206: throws AllBackendsFailedException, SQLException {
0207: // Handle macros
0208: handleMacros(request);
0209:
0210: // Log lazy begin if needed
0211: if (request.isLazyTransactionStart())
0212: this .vdb.getRequestManager().logLazyTransactionBegin(
0213: request.getTransactionId());
0214:
0215: // Log request
0216: if (recoveryLog != null)
0217: recoveryLog.logRequestExecuting(request);
0218:
0219: DatabaseBackend backend;
0220: if (request.isAutoCommit())
0221: backend = chooseBackendForWriteRequest(request);
0222: else
0223: backend = (DatabaseBackend) backendPerTransactionId
0224: .get(new Long(request.getTransactionId()));
0225:
0226: if (backend == null)
0227: throw new SQLException(Translate.get(
0228: "loadbalancer.execute.no.backend.found", request
0229: .getSqlShortForm(vdb
0230: .getSqlShortFormLength())));
0231:
0232: ExecuteUpdateResult result;
0233: // Execute the request on the chosen backend
0234: try {
0235: result = executeStatementExecuteUpdateOnBackend(request,
0236: backend);
0237: } catch (UnreachableBackendException urbe) {
0238: // Notify failure in recovery log
0239: if (recoveryLog != null)
0240: recoveryLog.logRequestCompletion(request.getLogId(),
0241: false, request.getExecTimeInMs());
0242:
0243: // Try to execute query on different backend
0244: return statementExecuteUpdate(request);
0245: } catch (SQLException se) {
0246: // Notify failure in recovery log
0247: if (recoveryLog != null)
0248: recoveryLog.logRequestCompletion(request.getLogId(),
0249: false, request.getExecTimeInMs());
0250:
0251: String msg = Translate.get("loadbalancer.request.failed",
0252: new String[] { String.valueOf(request.getId()),
0253: se.getMessage() });
0254: if (logger.isInfoEnabled())
0255: logger.info(msg);
0256: throw new SQLException(msg);
0257: } catch (RuntimeException e) {
0258: // Notify failure in recovery log
0259: if (recoveryLog != null)
0260: recoveryLog.logRequestCompletion(request.getLogId(),
0261: false, request.getExecTimeInMs());
0262:
0263: String msg = Translate.get(
0264: "loadbalancer.request.failed.on.backend",
0265: new String[] {
0266: request.getSqlShortForm(vdb
0267: .getSqlShortFormLength()),
0268: backend.getName(), e.getMessage() });
0269: logger.error(msg, e);
0270: throw new SQLException(msg);
0271: }
0272:
0273: return result;
0274: }
0275:
0276: /**
0277: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecuteUpdateWithKeys(AbstractWriteRequest,
0278: * MetadataCache)
0279: */
0280: public GeneratedKeysResult statementExecuteUpdateWithKeys(
0281: AbstractWriteRequest request, MetadataCache metadataCache)
0282: throws AllBackendsFailedException, SQLException {
0283: // Handle macros
0284: handleMacros(request);
0285:
0286: // Log lazy begin if needed
0287: if (request.isLazyTransactionStart())
0288: this .vdb.getRequestManager().logLazyTransactionBegin(
0289: request.getTransactionId());
0290:
0291: // Log request
0292: if (recoveryLog != null)
0293: recoveryLog.logRequestExecuting(request);
0294:
0295: DatabaseBackend backend;
0296: if (request.isAutoCommit())
0297: backend = chooseBackendForWriteRequest(request);
0298: else
0299: backend = (DatabaseBackend) backendPerTransactionId
0300: .get(new Long(request.getTransactionId()));
0301:
0302: if (backend == null)
0303: throw new SQLException(Translate.get(
0304: "loadbalancer.execute.no.backend.found", request
0305: .getSqlShortForm(vdb
0306: .getSqlShortFormLength())));
0307:
0308: GeneratedKeysResult rs;
0309: // Execute the request on the chosen backend
0310: try {
0311: rs = executeStatementExecuteUpdateWithKeysOnBackend(
0312: request, backend, metadataCache);
0313: } catch (UnreachableBackendException urbe) {
0314: // Notify failure in recovery log
0315: if (recoveryLog != null)
0316: recoveryLog.logRequestCompletion(request.getLogId(),
0317: false, request.getExecTimeInMs());
0318:
0319: // Try to execute query on different backend
0320: return statementExecuteUpdateWithKeys(request,
0321: metadataCache);
0322: } catch (SQLException se) {
0323: // Notify failure in recovery log
0324: if (recoveryLog != null)
0325: recoveryLog.logRequestCompletion(request.getLogId(),
0326: false, request.getExecTimeInMs());
0327:
0328: String msg = Translate.get("loadbalancer.request.failed",
0329: new String[] { String.valueOf(request.getId()),
0330: se.getMessage() });
0331: if (logger.isInfoEnabled())
0332: logger.info(msg);
0333: throw new SQLException(msg);
0334: } catch (RuntimeException e) {
0335: // Notify failure in recovery log
0336: if (recoveryLog != null)
0337: recoveryLog.logRequestCompletion(request.getLogId(),
0338: false, request.getExecTimeInMs());
0339:
0340: String msg = Translate.get(
0341: "loadbalancer.request.failed.on.backend",
0342: new String[] {
0343: request.getSqlShortForm(vdb
0344: .getSqlShortFormLength()),
0345: backend.getName(), e.getMessage() });
0346: logger.error(msg, e);
0347: throw new SQLException(msg);
0348: }
0349:
0350: return rs;
0351: }
0352:
0353: /**
0354: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
0355: * MetadataCache)
0356: */
0357: public ExecuteResult statementExecute(AbstractRequest request,
0358: MetadataCache metadataCache) throws SQLException,
0359: AllBackendsFailedException {
0360: // Handle macros
0361: handleMacros(request);
0362:
0363: // Log lazy begin if needed
0364: if (request.isLazyTransactionStart())
0365: this .vdb.getRequestManager().logLazyTransactionBegin(
0366: request.getTransactionId());
0367:
0368: // Log request
0369: if (recoveryLog != null)
0370: recoveryLog.logRequestExecuting(request);
0371:
0372: DatabaseBackend backend;
0373: if (request.isAutoCommit())
0374: backend = chooseBackendForReadRequest(request);
0375: else
0376: backend = (DatabaseBackend) backendPerTransactionId
0377: .get(new Long(request.getTransactionId()));
0378:
0379: if (backend == null)
0380: throw new SQLException(Translate.get(
0381: "loadbalancer.storedprocedure.no.backend.found",
0382: request
0383: .getSqlShortForm(vdb
0384: .getSqlShortFormLength())));
0385:
0386: ExecuteResult rs = null;
0387: // Execute the request on the chosen backend
0388: try {
0389: rs = executeStatementExecuteOnBackend(request, backend,
0390: metadataCache);
0391: } catch (UnreachableBackendException urbe) {
0392: // Notify failure in recovery log
0393: recoveryLog.logRequestCompletion(request.getLogId(), false,
0394: request.getExecTimeInMs());
0395:
0396: // Try to execute query on different backend
0397: return statementExecute(request, metadataCache);
0398: } catch (SQLException se) {
0399: // Notify failure in recovery log
0400: recoveryLog.logRequestCompletion(request.getLogId(), false,
0401: request.getExecTimeInMs());
0402:
0403: String msg = Translate.get(
0404: "loadbalancer.storedprocedure.failed",
0405: new String[] { String.valueOf(request.getId()),
0406: se.getMessage() });
0407: if (logger.isInfoEnabled())
0408: logger.info(msg);
0409: throw new SQLException(msg);
0410: } catch (RuntimeException e) {
0411: // Notify failure in recovery log
0412: recoveryLog.logRequestCompletion(request.getLogId(), false,
0413: request.getExecTimeInMs());
0414:
0415: String msg = Translate.get(
0416: "loadbalancer.storedprocedure.failed.on.backend",
0417: new String[] {
0418: request.getSqlShortForm(vdb
0419: .getSqlShortFormLength()),
0420: backend.getName(), e.getMessage() });
0421: logger.error(msg, e);
0422: throw new SQLException(msg);
0423: }
0424:
0425: return rs;
0426: }
0427:
0428: /**
0429: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
0430: * MetadataCache)
0431: */
0432: public ControllerResultSet readOnlyCallableStatementExecuteQuery(
0433: StoredProcedure proc, MetadataCache metadataCache)
0434: throws SQLException {
0435: return callableStatementExecuteQuery(proc, metadataCache);
0436: }
0437:
0438: /**
0439: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(StoredProcedure,
0440: * MetadataCache)
0441: */
0442: public ExecuteResult readOnlyCallableStatementExecute(
0443: StoredProcedure proc, MetadataCache metadataCache)
0444: throws SQLException {
0445: return callableStatementExecute(proc, metadataCache);
0446: }
0447:
0448: /**
0449: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
0450: * MetadataCache)
0451: */
0452: public ControllerResultSet callableStatementExecuteQuery(
0453: StoredProcedure proc, MetadataCache metadataCache)
0454: throws SQLException {
0455: // Handle macros
0456: handleMacros(proc);
0457:
0458: // Log lazy begin if needed
0459: if (proc.isLazyTransactionStart())
0460: this .vdb.getRequestManager().logLazyTransactionBegin(
0461: proc.getTransactionId());
0462:
0463: // Log request
0464: if (recoveryLog != null)
0465: recoveryLog.logRequestExecuting(proc);
0466:
0467: DatabaseBackend backend;
0468: if (proc.isAutoCommit())
0469: backend = chooseBackendForReadRequest(proc);
0470: else
0471: backend = (DatabaseBackend) backendPerTransactionId
0472: .get(new Long(proc.getTransactionId()));
0473:
0474: if (backend == null)
0475: throw new SQLException(Translate.get(
0476: "loadbalancer.storedprocedure.no.backend.found",
0477: proc.getSqlShortForm(vdb.getSqlShortFormLength())));
0478:
0479: ControllerResultSet rs = null;
0480: // Execute the request on the chosen backend
0481: try {
0482: rs = executeCallableStatementExecuteQueryOnBackend(proc,
0483: backend, metadataCache);
0484: } catch (UnreachableBackendException urbe) {
0485: // Notify failure in recovery log
0486: if (recoveryLog != null)
0487: recoveryLog.logRequestCompletion(proc.getLogId(),
0488: false, proc.getExecTimeInMs());
0489:
0490: // Try to execute query on different backend
0491: return callableStatementExecuteQuery(proc, metadataCache);
0492: } catch (SQLException se) {
0493: // Notify failure in recovery log
0494: if (recoveryLog != null)
0495: recoveryLog.logRequestCompletion(proc.getLogId(),
0496: false, proc.getExecTimeInMs());
0497:
0498: String msg = Translate.get(
0499: "loadbalancer.storedprocedure.failed",
0500: new String[] { String.valueOf(proc.getId()),
0501: se.getMessage() });
0502: if (logger.isInfoEnabled())
0503: logger.info(msg);
0504: throw new SQLException(msg);
0505: } catch (RuntimeException e) {
0506: // Notify failure in recovery log
0507: if (recoveryLog != null)
0508: recoveryLog.logRequestCompletion(proc.getLogId(),
0509: false, proc.getExecTimeInMs());
0510:
0511: String msg = Translate.get(
0512: "loadbalancer.storedprocedure.failed.on.backend",
0513: new String[] {
0514: proc.getSqlShortForm(vdb
0515: .getSqlShortFormLength()),
0516: backend.getName(), e.getMessage() });
0517: logger.error(msg, e);
0518: throw new SQLException(msg);
0519: }
0520:
0521: return rs;
0522: }
0523:
0524: /**
0525: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
0526: */
0527: public ExecuteUpdateResult callableStatementExecuteUpdate(
0528: StoredProcedure proc) throws SQLException {
0529: // Handle macros
0530: handleMacros(proc);
0531:
0532: // Log lazy begin if needed
0533: if (proc.isLazyTransactionStart())
0534: this .vdb.getRequestManager().logLazyTransactionBegin(
0535: proc.getTransactionId());
0536:
0537: // Log request
0538: if (recoveryLog != null)
0539: recoveryLog.logRequestExecuting(proc);
0540:
0541: DatabaseBackend backend;
0542: if (proc.isAutoCommit())
0543: backend = chooseBackendForReadRequest(proc);
0544: else
0545: backend = (DatabaseBackend) backendPerTransactionId
0546: .get(new Long(proc.getTransactionId()));
0547:
0548: if (backend == null)
0549: throw new SQLException(Translate.get(
0550: "loadbalancer.storedprocedure.no.backend.found",
0551: proc.getSqlShortForm(vdb.getSqlShortFormLength())));
0552:
0553: ExecuteUpdateResult result;
0554: // Execute the request on the chosen backend
0555: try {
0556: result = executeCallableStatementExecuteUpdateOnBackend(
0557: proc, backend);
0558: } catch (UnreachableBackendException urbe) {
0559: // Notify failure in recovery log
0560: if (recoveryLog != null)
0561: recoveryLog.logRequestCompletion(proc.getLogId(),
0562: false, proc.getExecTimeInMs());
0563:
0564: // Try to execute query on different backend
0565: return callableStatementExecuteUpdate(proc);
0566: } catch (SQLException se) {
0567: // Notify failure in recovery log
0568: if (recoveryLog != null)
0569: recoveryLog.logRequestCompletion(proc.getLogId(),
0570: false, proc.getExecTimeInMs());
0571:
0572: String msg = Translate.get(
0573: "loadbalancer.storedprocedure.failed",
0574: new String[] { String.valueOf(proc.getId()),
0575: se.getMessage() });
0576: if (logger.isInfoEnabled())
0577: logger.info(msg);
0578: throw new SQLException(msg);
0579: } catch (RuntimeException e) {
0580: // Notify failure in recovery log
0581: if (recoveryLog != null)
0582: recoveryLog.logRequestCompletion(proc.getLogId(),
0583: false, proc.getExecTimeInMs());
0584:
0585: String msg = Translate.get(
0586: "loadbalancer.storedprocedure.failed.on.backend",
0587: new String[] {
0588: proc.getSqlShortForm(vdb
0589: .getSqlShortFormLength()),
0590: backend.getName(), e.getMessage() });
0591: logger.error(msg, e);
0592: throw new SQLException(msg);
0593: }
0594:
0595: return result;
0596: }
0597:
0598: /**
0599: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
0600: * MetadataCache)
0601: */
0602: public ExecuteResult callableStatementExecute(StoredProcedure proc,
0603: MetadataCache metadataCache) throws SQLException {
0604: // Handle macros
0605: handleMacros(proc);
0606:
0607: // Log lazy begin if needed
0608: if (proc.isLazyTransactionStart())
0609: this .vdb.getRequestManager().logLazyTransactionBegin(
0610: proc.getTransactionId());
0611:
0612: // Log request
0613: if (recoveryLog != null)
0614: recoveryLog.logRequestExecuting(proc);
0615:
0616: DatabaseBackend backend;
0617: if (proc.isAutoCommit())
0618: backend = chooseBackendForReadRequest(proc);
0619: else
0620: backend = (DatabaseBackend) backendPerTransactionId
0621: .get(new Long(proc.getTransactionId()));
0622:
0623: if (backend == null)
0624: throw new SQLException(Translate.get(
0625: "loadbalancer.storedprocedure.no.backend.found",
0626: proc.getSqlShortForm(vdb.getSqlShortFormLength())));
0627:
0628: ExecuteResult rs = null;
0629: // Execute the request on the chosen backend
0630: try {
0631: rs = executeCallableStatementExecuteOnBackend(proc,
0632: backend, metadataCache);
0633: } catch (UnreachableBackendException urbe) {
0634: // Notify failure in recovery log
0635: if (recoveryLog != null)
0636: recoveryLog.logRequestCompletion(proc.getLogId(),
0637: false, proc.getExecTimeInMs());
0638:
0639: // Try to execute query on different backend
0640: ExecuteResult result = callableStatementExecute(proc,
0641: metadataCache);
0642:
0643: return result;
0644: } catch (SQLException se) {
0645: // Notify failure in recovery log
0646: if (recoveryLog != null)
0647: recoveryLog.logRequestCompletion(proc.getLogId(),
0648: false, proc.getExecTimeInMs());
0649:
0650: String msg = Translate.get(
0651: "loadbalancer.storedprocedure.failed",
0652: new String[] { String.valueOf(proc.getId()),
0653: se.getMessage() });
0654: if (logger.isInfoEnabled())
0655: logger.info(msg);
0656: throw new SQLException(msg);
0657: } catch (RuntimeException e) {
0658: // Notify failure in recovery log
0659: if (recoveryLog != null)
0660: recoveryLog.logRequestCompletion(proc.getLogId(),
0661: false, proc.getExecTimeInMs());
0662:
0663: String msg = Translate.get(
0664: "loadbalancer.storedprocedure.failed.on.backend",
0665: new String[] {
0666: proc.getSqlShortForm(vdb
0667: .getSqlShortFormLength()),
0668: backend.getName(), e.getMessage() });
0669: logger.error(msg, e);
0670: throw new SQLException(msg);
0671: }
0672:
0673: return rs;
0674: }
0675:
0676: /**
0677: * Execute a read request on the selected backend.
0678: *
0679: * @param request the request to execute
0680: * @param backend the backend that will execute the request
0681: * @param metadataCache MetadataCache (null if none)
0682: * @return the ControllerResultSet
0683: * @throws SQLException if an error occurs
0684: */
0685: private ControllerResultSet executeStatementExecuteQueryOnBackend(
0686: SelectRequest request, DatabaseBackend backend,
0687: MetadataCache metadataCache) throws SQLException,
0688: UnreachableBackendException {
0689: // Ok, we have a backend, let's execute the request
0690: AbstractConnectionManager cm = backend
0691: .getConnectionManager(request.getLogin());
0692:
0693: // Sanity check
0694: if (cm == null) {
0695: String msg = Translate.get(
0696: "loadbalancer.connectionmanager.not.found",
0697: new String[] { request.getLogin(),
0698: backend.getName() });
0699: logger.error(msg);
0700: throw new SQLException(msg);
0701: }
0702:
0703: // Execute the query
0704: if (request.isAutoCommit()) {
0705: ControllerResultSet rs = null;
0706: boolean badConnection;
0707: do {
0708: badConnection = false;
0709: // Use a connection just for this request
0710: PooledConnection pc = null;
0711: try {
0712: pc = cm.retrieveConnectionInAutoCommit(request);
0713: } catch (UnreachableBackendException e1) {
0714: String msg = Translate
0715: .get(
0716: "loadbalancer.backend.disabling.unreachable",
0717: backend.getName());
0718: logger.error(msg);
0719: endUserLogger.error(msg);
0720: disableBackend(backend, true);
0721: throw new UnreachableBackendException(Translate
0722: .get("loadbalancer.backend.unreacheable",
0723: backend.getName()));
0724: }
0725:
0726: // Sanity check
0727: if (pc == null)
0728: throw new SQLException(Translate.get(
0729: "loadbalancer.backend.no.connection",
0730: backend.getName()));
0731:
0732: // Execute Query
0733: try {
0734: rs = executeStatementExecuteQueryOnBackend(request,
0735: backend, null, pc.getConnection(),
0736: metadataCache);
0737: cm.releaseConnectionInAutoCommit(request, pc);
0738: } catch (BadConnectionException e) { // Get rid of the bad connection
0739: cm.deleteConnection(pc);
0740: badConnection = true;
0741: } catch (Throwable e) {
0742: cm.releaseConnectionInAutoCommit(request, pc);
0743: throw new SQLException(
0744: Translate
0745: .get(
0746: "loadbalancer.request.failed.on.backend",
0747: new String[] {
0748: request
0749: .getSqlShortForm(vdb
0750: .getSqlShortFormLength()),
0751: backend.getName(),
0752: e.getMessage() }));
0753: }
0754: } while (badConnection);
0755: if (logger.isDebugEnabled())
0756: logger.debug(Translate.get("loadbalancer.execute.on",
0757: new String[] { String.valueOf(request.getId()),
0758: backend.getName() }));
0759: return rs;
0760: } else { // Inside a transaction
0761: Connection c;
0762: long tid = request.getTransactionId();
0763:
0764: try {
0765: c = backend
0766: .getConnectionForTransactionAndLazyBeginIfNeeded(
0767: request, cm);
0768: } catch (UnreachableBackendException e1) {
0769: String msg = Translate.get(
0770: "loadbalancer.backend.disabling.unreachable",
0771: backend.getName());
0772: logger.error(msg);
0773: endUserLogger.error(msg);
0774: disableBackend(backend, true);
0775: throw new SQLException(Translate.get(
0776: "loadbalancer.backend.unreacheable", backend
0777: .getName()));
0778: } catch (NoTransactionStartWhenDisablingException e) {
0779: String msg = Translate.get(
0780: "loadbalancer.backend.is.disabling",
0781: new String[] {
0782: request.getSqlShortForm(vdb
0783: .getSqlShortFormLength()),
0784: backend.getName() });
0785: logger.error(msg);
0786: throw new SQLException(msg);
0787: }
0788:
0789: // Sanity check
0790: if (c == null)
0791: throw new SQLException(Translate.get(
0792: "loadbalancer.unable.retrieve.connection",
0793: new String[] { String.valueOf(tid),
0794: backend.getName() }));
0795:
0796: // Execute Query
0797: ControllerResultSet rs = null;
0798: try {
0799: rs = executeStatementExecuteQueryOnBackend(request,
0800: backend, null, c, metadataCache);
0801: } catch (BadConnectionException e) { // Connection failed, so did the transaction
0802: // Disable the backend.
0803: cm.deleteConnection(tid);
0804: String msg = Translate
0805: .get(
0806: "loadbalancer.backend.disabling.connection.failure",
0807: backend.getName());
0808: logger.error(msg);
0809: endUserLogger.error(msg);
0810: disableBackend(backend, true);
0811: throw new SQLException(msg);
0812: } catch (Throwable e) {
0813: throw new SQLException(Translate.get(
0814: "loadbalancer.request.failed.on.backend",
0815: new String[] {
0816: request.getSqlShortForm(vdb
0817: .getSqlShortFormLength()),
0818: backend.getName(), e.getMessage() }));
0819: }
0820: if (logger.isDebugEnabled())
0821: logger.debug(Translate.get(
0822: "loadbalancer.execute.transaction.on",
0823: new String[] { String.valueOf(tid),
0824: String.valueOf(request.getId()),
0825: backend.getName() }));
0826: return rs;
0827: }
0828: }
0829:
0830: /**
0831: * Execute a write request on the selected backend.
0832: *
0833: * @param request the request to execute
0834: * @param backend the backend that will execute the request
0835: * @return the number of modified rows
0836: * @throws SQLException if an error occurs
0837: */
0838: private ExecuteUpdateResult executeStatementExecuteUpdateOnBackend(
0839: AbstractWriteRequest request, DatabaseBackend backend)
0840: throws SQLException, UnreachableBackendException {
0841: if (backend == null)
0842: throw new NoMoreBackendException(Translate.get(
0843: "loadbalancer.execute.no.backend.available",
0844: request.getId()));
0845:
0846: try {
0847: AbstractConnectionManager cm = backend
0848: .getConnectionManager(request.getLogin());
0849: if (request.isAutoCommit()) { // Use a connection just for this request
0850: PooledConnection c = null;
0851: try {
0852: c = cm.retrieveConnectionInAutoCommit(request);
0853: } catch (UnreachableBackendException e1) {
0854: String backendName = backend.getName();
0855: String msg = Translate
0856: .get(
0857: "loadbalancer.backend.disabling.unreachable",
0858: backendName);
0859: logger.error(msg);
0860: endUserLogger.error(msg);
0861: disableBackend(backend, true);
0862: throw new UnreachableBackendException(Translate
0863: .get("loadbalancer.backend.unreacheable",
0864: backendName));
0865: }
0866:
0867: // Sanity check
0868: if (c == null)
0869: throw new UnreachableBackendException(Translate
0870: .get("loadbalancer.backend.no.connection",
0871: backend.getName()));
0872:
0873: // Execute Query
0874: ExecuteUpdateResult result;
0875: try {
0876: result = executeStatementExecuteUpdateOnBackend(
0877: request, backend, null, c);
0878: } catch (Exception e) {
0879: throw new SQLException(
0880: Translate
0881: .get(
0882: "loadbalancer.request.failed.on.backend",
0883: new String[] {
0884: request
0885: .getSqlShortForm(vdb
0886: .getSqlShortFormLength()),
0887: backend.getName(),
0888: e.getMessage() }));
0889: } finally {
0890: cm.releaseConnectionInAutoCommit(request, c);
0891: }
0892: return result;
0893: } else { // Re-use the connection used by this transaction
0894: PooledConnection c = cm
0895: .retrieveConnectionForTransaction(request
0896: .getTransactionId());
0897:
0898: // Sanity check
0899: if (c == null)
0900: throw new SQLException(Translate.get(
0901: "loadbalancer.unable.retrieve.connection",
0902: new String[] {
0903: String.valueOf(request
0904: .getTransactionId()),
0905: backend.getName() }));
0906:
0907: // Execute Query
0908: ExecuteUpdateResult result;
0909: try {
0910: result = executeStatementExecuteUpdateOnBackend(
0911: request, backend, null, c);
0912: return result;
0913: } catch (Exception e) {
0914: throw new SQLException(
0915: Translate
0916: .get(
0917: "loadbalancer.request.failed.on.backend",
0918: new String[] {
0919: request
0920: .getSqlShortForm(vdb
0921: .getSqlShortFormLength()),
0922: backend.getName(),
0923: e.getMessage() }));
0924: }
0925: }
0926: } catch (RuntimeException e) {
0927: String msg = Translate.get(
0928: "loadbalancer.request.failed.on.backend",
0929: new String[] {
0930: request.getSqlShortForm(vdb
0931: .getSqlShortFormLength()),
0932: backend.getName(), e.getMessage() });
0933: logger.fatal(msg, e);
0934: throw new SQLException(msg);
0935: }
0936: }
0937:
0938: /**
0939: * Execute a write request on the selected backend and return the
0940: * autogenerated keys.
0941: *
0942: * @param request the request to execute
0943: * @param backend the backend that will execute the request
0944: * @param metadataCache MetadataCache (null if none)
0945: * @return the ResultSet containing the auto-generated keys
0946: * @throws SQLException if an error occurs
0947: */
0948: private GeneratedKeysResult executeStatementExecuteUpdateWithKeysOnBackend(
0949: AbstractWriteRequest request, DatabaseBackend backend,
0950: MetadataCache metadataCache) throws SQLException,
0951: UnreachableBackendException {
0952: if (backend == null)
0953: throw new NoMoreBackendException(Translate.get(
0954: "loadbalancer.execute.no.backend.available",
0955: request.getId()));
0956:
0957: if (!backend.getDriverCompliance().supportGetGeneratedKeys())
0958: throw new SQLException(
0959: Translate
0960: .get(
0961: "loadbalancer.backend.autogeneratedkeys.unsupported",
0962: backend.getName()));
0963:
0964: try {
0965: AbstractConnectionManager cm = backend
0966: .getConnectionManager(request.getLogin());
0967: if (request.isAutoCommit()) { // Use a connection just for this request
0968: PooledConnection c = null;
0969: try {
0970: c = cm.retrieveConnectionInAutoCommit(request);
0971: } catch (UnreachableBackendException e1) {
0972: String backendName = backend.getName();
0973: String msg = Translate
0974: .get(
0975: "loadbalancer.backend.disabling.unreachable",
0976: backendName);
0977: logger.error(msg);
0978: endUserLogger.error(msg);
0979: disableBackend(backend, true);
0980: throw new UnreachableBackendException(Translate
0981: .get("loadbalancer.backend.unreacheable",
0982: backendName));
0983: }
0984:
0985: // Sanity check
0986: if (c == null)
0987: throw new UnreachableBackendException(Translate
0988: .get("loadbalancer.backend.no.connection",
0989: backend.getName()));
0990:
0991: // Execute Query
0992: GeneratedKeysResult result;
0993: try {
0994: result = executeStatementExecuteUpdateWithKeysOnBackend(
0995: request, backend, null, c, metadataCache);
0996: } catch (Exception e) {
0997: throw new SQLException(
0998: Translate
0999: .get(
1000: "loadbalancer.request.failed.on.backend",
1001: new String[] {
1002: request
1003: .getSqlShortForm(vdb
1004: .getSqlShortFormLength()),
1005: backend.getName(),
1006: e.getMessage() }));
1007: } finally {
1008: cm.releaseConnectionInAutoCommit(request, c);
1009: }
1010: return result;
1011: } else { // Re-use the connection used by this transaction
1012: PooledConnection c = cm
1013: .retrieveConnectionForTransaction(request
1014: .getTransactionId());
1015:
1016: // Sanity check
1017: if (c == null)
1018: throw new SQLException(Translate.get(
1019: "loadbalancer.unable.retrieve.connection",
1020: new String[] {
1021: String.valueOf(request
1022: .getTransactionId()),
1023: backend.getName() }));
1024:
1025: // Execute Query
1026: try {
1027: return executeStatementExecuteUpdateWithKeysOnBackend(
1028: request, backend, null, c, metadataCache);
1029: } catch (Exception e) {
1030: throw new SQLException(
1031: Translate
1032: .get(
1033: "loadbalancer.request.failed.on.backend",
1034: new String[] {
1035: request
1036: .getSqlShortForm(vdb
1037: .getSqlShortFormLength()),
1038: backend.getName(),
1039: e.getMessage() }));
1040: }
1041: }
1042: } catch (RuntimeException e) {
1043: String msg = Translate.get("loadbalancer.request.failed",
1044: new String[] {
1045: request.getSqlShortForm(vdb
1046: .getSqlShortFormLength()),
1047: e.getMessage() });
1048: logger.fatal(msg, e);
1049: endUserLogger.fatal(msg);
1050: throw new SQLException(msg);
1051: }
1052: }
1053:
1054: /**
1055: * Execute a request that return multiple results on the selected backend.
1056: *
1057: * @param request the request to execute
1058: * @param backend the backend that will execute the request
1059: * @param metadataCache MetadataCache (null if none)
1060: * @return an <code>ExecuteResult</code> object
1061: * @throws SQLException if an error occurs
1062: */
1063: private ExecuteResult executeStatementExecuteOnBackend(
1064: AbstractRequest request, DatabaseBackend backend,
1065: MetadataCache metadataCache) throws SQLException,
1066: UnreachableBackendException {
1067: // Ok, we have a backend, let's execute the request
1068: AbstractConnectionManager cm = backend
1069: .getConnectionManager(request.getLogin());
1070:
1071: // Sanity check
1072: if (cm == null) {
1073: String msg = Translate.get(
1074: "loadbalancer.connectionmanager.not.found",
1075: new String[] { request.getLogin(),
1076: backend.getName() });
1077: logger.error(msg);
1078: throw new SQLException(msg);
1079: }
1080:
1081: // Execute the query
1082: if (request.isAutoCommit()) {
1083: // Use a connection just for this request
1084: PooledConnection c = null;
1085: try {
1086: c = cm.retrieveConnectionInAutoCommit(request);
1087: } catch (UnreachableBackendException e1) {
1088: String msg = Translate.get(
1089: "loadbalancer.backend.disabling.unreachable",
1090: backend.getName());
1091: logger.error(msg);
1092: endUserLogger.error(msg);
1093: disableBackend(backend, true);
1094: throw new UnreachableBackendException(Translate.get(
1095: "loadbalancer.backend.unreacheable", backend
1096: .getName()));
1097: }
1098:
1099: // Sanity check
1100: if (c == null)
1101: throw new UnreachableBackendException(Translate.get(
1102: "loadbalancer.backend.no.connection", backend
1103: .getName()));
1104:
1105: // Execute Query
1106: ExecuteResult rs = null;
1107: try {
1108: rs = AbstractLoadBalancer
1109: .executeStatementExecuteOnBackend(request,
1110: backend, null, c, metadataCache);
1111: } catch (Exception e) {
1112: throw new SQLException(Translate.get(
1113: "loadbalancer.request.failed.on.backend",
1114: new String[] {
1115: request.getSqlShortForm(vdb
1116: .getSqlShortFormLength()),
1117: backend.getName(), e.getMessage() }));
1118: } finally {
1119: cm.releaseConnectionInAutoCommit(request, c);
1120: }
1121: if (logger.isDebugEnabled())
1122: logger.debug(Translate.get("loadbalancer.request.on",
1123: new String[] { String.valueOf(request.getId()),
1124: backend.getName() }));
1125: return rs;
1126: } else { // Inside a transaction
1127: Connection c;
1128: long tid = request.getTransactionId();
1129:
1130: try {
1131: c = backend
1132: .getConnectionForTransactionAndLazyBeginIfNeeded(
1133: request, cm);
1134: } catch (UnreachableBackendException e1) {
1135: String msg = Translate.get(
1136: "loadbalancer.backend.disabling.unreachable",
1137: backend.getName());
1138: logger.error(msg);
1139: endUserLogger.error(msg);
1140: disableBackend(backend, true);
1141: throw new SQLException(Translate.get(
1142: "loadbalancer.backend.unreacheable", backend
1143: .getName()));
1144: } catch (NoTransactionStartWhenDisablingException e) {
1145: String msg = Translate.get(
1146: "loadbalancer.backend.is.disabling",
1147: new String[] {
1148: request.getSqlShortForm(vdb
1149: .getSqlShortFormLength()),
1150: backend.getName() });
1151: logger.error(msg);
1152: throw new SQLException(msg);
1153: }
1154:
1155: // Sanity check
1156: if (c == null)
1157: throw new SQLException(Translate.get(
1158: "loadbalancer.unable.retrieve.connection",
1159: new String[] { String.valueOf(tid),
1160: backend.getName() }));
1161:
1162: // Execute Query
1163: ExecuteResult rs;
1164: try {
1165: rs = AbstractLoadBalancer
1166: .executeStatementExecuteOnBackend(
1167: request,
1168: backend,
1169: null,
1170: cm
1171: .retrieveConnectionForTransaction(tid),
1172: metadataCache);
1173: } catch (Exception e) {
1174: throw new SQLException(Translate.get(
1175: "loadbalancer.request.failed.on.backend",
1176: new String[] {
1177: request.getSqlShortForm(vdb
1178: .getSqlShortFormLength()),
1179: backend.getName(), e.getMessage() }));
1180: }
1181: if (logger.isDebugEnabled())
1182: logger.debug(Translate.get(
1183: "loadbalancer.execute.transaction.on",
1184: new String[] { String.valueOf(tid),
1185: String.valueOf(request.getId()),
1186: backend.getName() }));
1187: return rs;
1188: }
1189: }
1190:
1191: /**
1192: * Execute a stored procedure on the selected backend.
1193: *
1194: * @param proc the stored procedure to execute
1195: * @param backend the backend that will execute the request
1196: * @param metadataCache MetadataCache (null if none)
1197: * @return the ControllerResultSet
1198: * @throws SQLException if an error occurs
1199: */
1200: private ControllerResultSet executeCallableStatementExecuteQueryOnBackend(
1201: StoredProcedure proc, DatabaseBackend backend,
1202: MetadataCache metadataCache) throws SQLException,
1203: UnreachableBackendException {
1204: // Ok, we have a backend, let's execute the request
1205: AbstractConnectionManager cm = backend
1206: .getConnectionManager(proc.getLogin());
1207:
1208: // Sanity check
1209: if (cm == null) {
1210: String msg = Translate
1211: .get("loadbalancer.connectionmanager.not.found",
1212: new String[] { proc.getLogin(),
1213: backend.getName() });
1214: logger.error(msg);
1215: throw new SQLException(msg);
1216: }
1217:
1218: // Execute the query
1219: if (proc.isAutoCommit()) {
1220: // Use a connection just for this request
1221: PooledConnection c = null;
1222: try {
1223: c = cm.retrieveConnectionInAutoCommit(proc);
1224: } catch (UnreachableBackendException e1) {
1225: String msg = Translate.get(
1226: "loadbalancer.backend.disabling.unreachable",
1227: backend.getName());
1228: logger.error(msg);
1229: endUserLogger.error(msg);
1230: disableBackend(backend, true);
1231: throw new UnreachableBackendException(Translate.get(
1232: "loadbalancer.backend.unreacheable", backend
1233: .getName()));
1234: }
1235:
1236: // Sanity check
1237: if (c == null)
1238: throw new UnreachableBackendException(Translate.get(
1239: "loadbalancer.backend.no.connection", backend
1240: .getName()));
1241:
1242: // Execute Query
1243: ControllerResultSet rs = null;
1244: try {
1245: rs = AbstractLoadBalancer
1246: .executeCallableStatementExecuteQueryOnBackend(
1247: proc, backend, null, c.getConnection(),
1248: metadataCache);
1249: } catch (Exception e) {
1250: throw new SQLException(
1251: Translate
1252: .get(
1253: "loadbalancer.storedprocedure.failed.on.backend",
1254: new String[] {
1255: proc
1256: .getSqlShortForm(vdb
1257: .getSqlShortFormLength()),
1258: backend.getName(),
1259: e.getMessage() }));
1260: } finally {
1261: cm.releaseConnectionInAutoCommit(proc, c);
1262: }
1263: if (logger.isDebugEnabled())
1264: logger.debug(Translate.get(
1265: "loadbalancer.storedprocedure.on",
1266: new String[] { String.valueOf(proc.getId()),
1267: backend.getName() }));
1268: return rs;
1269: } else { // Inside a transaction
1270: Connection c;
1271: long tid = proc.getTransactionId();
1272:
1273: try {
1274: c = backend
1275: .getConnectionForTransactionAndLazyBeginIfNeeded(
1276: proc, cm);
1277: } catch (UnreachableBackendException e1) {
1278: String msg = Translate.get(
1279: "loadbalancer.backend.disabling.unreachable",
1280: backend.getName());
1281: logger.error(msg);
1282: endUserLogger.error(msg);
1283: disableBackend(backend, true);
1284: throw new SQLException(Translate.get(
1285: "loadbalancer.backend.unreacheable", backend
1286: .getName()));
1287: } catch (NoTransactionStartWhenDisablingException e) {
1288: String msg = Translate.get(
1289: "loadbalancer.backend.is.disabling",
1290: new String[] {
1291: proc.getSqlShortForm(vdb
1292: .getSqlShortFormLength()),
1293: backend.getName() });
1294: logger.error(msg);
1295: throw new SQLException(msg);
1296: }
1297:
1298: // Sanity check
1299: if (c == null)
1300: throw new SQLException(Translate.get(
1301: "loadbalancer.unable.retrieve.connection",
1302: new String[] { String.valueOf(tid),
1303: backend.getName() }));
1304:
1305: // Execute Query
1306: ControllerResultSet rs;
1307: try {
1308: rs = AbstractLoadBalancer
1309: .executeCallableStatementExecuteQueryOnBackend(
1310: proc, backend, null, c, metadataCache);
1311: } catch (Exception e) {
1312: throw new SQLException(
1313: Translate
1314: .get(
1315: "loadbalancer.storedprocedure.failed.on.backend",
1316: new String[] {
1317: proc
1318: .getSqlShortForm(vdb
1319: .getSqlShortFormLength()),
1320: backend.getName(),
1321: e.getMessage() }));
1322: }
1323: if (logger.isDebugEnabled())
1324: logger.debug(Translate.get(
1325: "loadbalancer.execute.transaction.on",
1326: new String[] { String.valueOf(tid),
1327: String.valueOf(proc.getId()),
1328: backend.getName() }));
1329: return rs;
1330: }
1331: }
1332:
1333: /**
1334: * Execute a stored procedure on the selected backend.
1335: *
1336: * @param proc the stored procedure to execute
1337: * @param backend the backend that will execute the request
1338: * @return the ResultSet
1339: * @throws SQLException if an error occurs
1340: */
1341: private ExecuteUpdateResult executeCallableStatementExecuteUpdateOnBackend(
1342: StoredProcedure proc, DatabaseBackend backend)
1343: throws SQLException, UnreachableBackendException {
1344: // Ok, we have a backend, let's execute the request
1345: AbstractConnectionManager cm = backend
1346: .getConnectionManager(proc.getLogin());
1347:
1348: // Sanity check
1349: if (cm == null) {
1350: String msg = Translate
1351: .get("loadbalancer.connectionmanager.not.found",
1352: new String[] { proc.getLogin(),
1353: backend.getName() });
1354: logger.error(msg);
1355: throw new SQLException(msg);
1356: }
1357:
1358: // Execute the query
1359: if (proc.isAutoCommit()) {
1360: // Use a connection just for this request
1361: PooledConnection c = null;
1362: try {
1363: c = cm.retrieveConnectionInAutoCommit(proc);
1364: } catch (UnreachableBackendException e1) {
1365: String msg = Translate.get(
1366: "loadbalancer.backend.disabling.unreachable",
1367: backend.getName());
1368: logger.error(msg);
1369: endUserLogger.error(msg);
1370: disableBackend(backend, true);
1371: throw new UnreachableBackendException(Translate.get(
1372: "loadbalancer.backend.unreacheable", backend
1373: .getName()));
1374: }
1375:
1376: // Sanity check
1377: if (c == null)
1378: throw new UnreachableBackendException(Translate.get(
1379: "loadbalancer.backend.no.connection", backend
1380: .getName()));
1381:
1382: // Execute Query
1383: ExecuteUpdateResult result;
1384: try {
1385: result = AbstractLoadBalancer
1386: .executeCallableStatementExecuteUpdateOnBackend(
1387: proc, backend, null, c);
1388:
1389: // Warning! No way to detect if schema has been modified unless
1390: // we ask the backend again using DatabaseMetaData.getTables().
1391: } catch (Exception e) {
1392: throw new SQLException(
1393: Translate
1394: .get(
1395: "loadbalancer.storedprocedure.failed.on.backend",
1396: new String[] {
1397: proc
1398: .getSqlShortForm(vdb
1399: .getSqlShortFormLength()),
1400: backend.getName(),
1401: e.getMessage() }));
1402: } finally {
1403: cm.releaseConnectionInAutoCommit(proc, c);
1404: }
1405: if (logger.isDebugEnabled())
1406: logger.debug(Translate.get(
1407: "loadbalancer.storedprocedure.on",
1408: new String[] { String.valueOf(proc.getId()),
1409: backend.getName() }));
1410: return result;
1411: } else { // Inside a transaction
1412: Connection c;
1413: long tid = proc.getTransactionId();
1414:
1415: try {
1416: c = backend
1417: .getConnectionForTransactionAndLazyBeginIfNeeded(
1418: proc, cm);
1419: } catch (UnreachableBackendException e1) {
1420: String msg = Translate.get(
1421: "loadbalancer.backend.disabling.unreachable",
1422: backend.getName());
1423: logger.error(msg);
1424: endUserLogger.error(msg);
1425: disableBackend(backend, true);
1426: throw new SQLException(Translate.get(
1427: "loadbalancer.backend.unreacheable", backend
1428: .getName()));
1429: } catch (NoTransactionStartWhenDisablingException e) {
1430: String msg = Translate.get(
1431: "loadbalancer.backend.is.disabling",
1432: new String[] {
1433: proc.getSqlShortForm(vdb
1434: .getSqlShortFormLength()),
1435: backend.getName() });
1436: logger.error(msg);
1437: throw new SQLException(msg);
1438: }
1439:
1440: // Sanity check
1441: if (c == null)
1442: throw new SQLException(Translate.get(
1443: "loadbalancer.unable.retrieve.connection",
1444: new String[] { String.valueOf(tid),
1445: backend.getName() }));
1446:
1447: // Execute Query
1448: ExecuteUpdateResult result;
1449: try {
1450: result = AbstractLoadBalancer
1451: .executeCallableStatementExecuteUpdateOnBackend(
1452: proc,
1453: backend,
1454: null,
1455: cm
1456: .retrieveConnectionForTransaction(tid));
1457:
1458: // Warning! No way to detect if schema has been modified unless
1459: // we ask the backend again using DatabaseMetaData.getTables().
1460: } catch (Exception e) {
1461: throw new SQLException(
1462: Translate
1463: .get(
1464: "loadbalancer.storedprocedure.failed.on.backend",
1465: new String[] {
1466: proc
1467: .getSqlShortForm(vdb
1468: .getSqlShortFormLength()),
1469: backend.getName(),
1470: e.getMessage() }));
1471: }
1472: if (logger.isDebugEnabled())
1473: logger.debug(Translate.get(
1474: "loadbalancer.execute.transaction.on",
1475: new String[] { String.valueOf(tid),
1476: String.valueOf(proc.getId()),
1477: backend.getName() }));
1478: return result;
1479: }
1480: }
1481:
1482: /**
1483: * Execute a stored procedure that return multiple results on the selected
1484: * backend.
1485: *
1486: * @param proc the stored procedure to execute
1487: * @param backend the backend that will execute the request
1488: * @param metadataCache MetadataCache (null if none)
1489: * @return an <code>ExecuteResult</code> object
1490: * @throws SQLException if an error occurs
1491: */
1492: private ExecuteResult executeCallableStatementExecuteOnBackend(
1493: StoredProcedure proc, DatabaseBackend backend,
1494: MetadataCache metadataCache) throws SQLException,
1495: UnreachableBackendException {
1496: // Ok, we have a backend, let's execute the request
1497: AbstractConnectionManager cm = backend
1498: .getConnectionManager(proc.getLogin());
1499:
1500: // Sanity check
1501: if (cm == null) {
1502: String msg = Translate
1503: .get("loadbalancer.connectionmanager.not.found",
1504: new String[] { proc.getLogin(),
1505: backend.getName() });
1506: logger.error(msg);
1507: throw new SQLException(msg);
1508: }
1509:
1510: // Execute the query
1511: if (proc.isAutoCommit()) {
1512: // Use a connection just for this request
1513: PooledConnection c = null;
1514: try {
1515: c = cm.retrieveConnectionInAutoCommit(proc);
1516: } catch (UnreachableBackendException e1) {
1517: String msg = Translate.get(
1518: "loadbalancer.backend.disabling.unreachable",
1519: backend.getName());
1520: logger.error(msg);
1521: endUserLogger.error(msg);
1522: disableBackend(backend, true);
1523: throw new UnreachableBackendException(Translate.get(
1524: "loadbalancer.backend.unreacheable", backend
1525: .getName()));
1526: }
1527:
1528: // Sanity check
1529: if (c == null)
1530: throw new UnreachableBackendException(Translate.get(
1531: "loadbalancer.backend.no.connection", backend
1532: .getName()));
1533:
1534: // Execute Query
1535: ExecuteResult rs = null;
1536: try {
1537: rs = AbstractLoadBalancer
1538: .executeCallableStatementExecuteOnBackend(proc,
1539: backend, null, c, metadataCache);
1540: } catch (Exception e) {
1541: throw new SQLException(
1542: Translate
1543: .get(
1544: "loadbalancer.storedprocedure.failed.on.backend",
1545: new String[] {
1546: proc
1547: .getSqlShortForm(vdb
1548: .getSqlShortFormLength()),
1549: backend.getName(),
1550: e.getMessage() }));
1551: } finally {
1552: cm.releaseConnectionInAutoCommit(proc, c);
1553: }
1554: if (logger.isDebugEnabled())
1555: logger.debug(Translate.get(
1556: "loadbalancer.storedprocedure.on",
1557: new String[] { String.valueOf(proc.getId()),
1558: backend.getName() }));
1559: return rs;
1560: } else { // Inside a transaction
1561: Connection c;
1562: long tid = proc.getTransactionId();
1563:
1564: try {
1565: c = backend
1566: .getConnectionForTransactionAndLazyBeginIfNeeded(
1567: proc, cm);
1568: } catch (UnreachableBackendException e1) {
1569: String msg = Translate.get(
1570: "loadbalancer.backend.disabling.unreachable",
1571: backend.getName());
1572: logger.error(msg);
1573: endUserLogger.error(msg);
1574: disableBackend(backend, true);
1575: throw new SQLException(Translate.get(
1576: "loadbalancer.backend.unreacheable", backend
1577: .getName()));
1578: } catch (NoTransactionStartWhenDisablingException e) {
1579: String msg = Translate.get(
1580: "loadbalancer.backend.is.disabling",
1581: new String[] {
1582: proc.getSqlShortForm(vdb
1583: .getSqlShortFormLength()),
1584: backend.getName() });
1585: logger.error(msg);
1586: throw new SQLException(msg);
1587: }
1588:
1589: // Sanity check
1590: if (c == null)
1591: throw new SQLException(Translate.get(
1592: "loadbalancer.unable.retrieve.connection",
1593: new String[] { String.valueOf(tid),
1594: backend.getName() }));
1595:
1596: // Execute Query
1597: ExecuteResult rs;
1598: try {
1599: rs = AbstractLoadBalancer
1600: .executeCallableStatementExecuteOnBackend(
1601: proc,
1602: backend,
1603: null,
1604: cm
1605: .retrieveConnectionForTransaction(tid),
1606: metadataCache);
1607: } catch (Exception e) {
1608: throw new SQLException(
1609: Translate
1610: .get(
1611: "loadbalancer.storedprocedure.failed.on.backend",
1612: new String[] {
1613: proc
1614: .getSqlShortForm(vdb
1615: .getSqlShortFormLength()),
1616: backend.getName(),
1617: e.getMessage() }));
1618: }
1619: if (logger.isDebugEnabled())
1620: logger.debug(Translate.get(
1621: "loadbalancer.execute.transaction.on",
1622: new String[] { String.valueOf(tid),
1623: String.valueOf(proc.getId()),
1624: backend.getName() }));
1625: return rs;
1626: }
1627: }
1628:
1629: /**
1630: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
1631: */
1632: public ControllerResultSet getPreparedStatementGetMetaData(
1633: AbstractRequest request) throws SQLException {
1634: DatabaseBackend backend = chooseBackendForReadRequest(request);
1635:
1636: if (backend == null)
1637: throw new NoMoreBackendException(Translate.get(
1638: "loadbalancer.execute.no.backend.enabled", request
1639: .getId()));
1640:
1641: // Ok, we have a backend, let's execute the request
1642: AbstractConnectionManager cm = backend
1643: .getConnectionManager(request.getLogin());
1644:
1645: // Sanity check
1646: if (cm == null) {
1647: String msg = Translate.get(
1648: "loadbalancer.connectionmanager.not.found",
1649: new String[] { request.getLogin(),
1650: backend.getName() });
1651: logger.error(msg);
1652: throw new SQLException(msg);
1653: }
1654:
1655: // Execute the query
1656: if (request.isAutoCommit()) {
1657: ControllerResultSet rs = null;
1658: boolean badConnection;
1659: do {
1660: badConnection = false;
1661: // Use a connection just for this request
1662: PooledConnection c = null;
1663: try {
1664: c = cm.retrieveConnectionInAutoCommit(request);
1665: } catch (UnreachableBackendException e1) {
1666: String msg = Translate
1667: .get(
1668: "loadbalancer.backend.disabling.unreachable",
1669: backend.getName());
1670: logger.error(msg);
1671: endUserLogger.error(msg);
1672: disableBackend(backend, true);
1673: // Retry on a different backend
1674: return getPreparedStatementGetMetaData(request);
1675: }
1676:
1677: // Sanity check
1678: if (c == null)
1679: throw new SQLException(Translate.get(
1680: "loadbalancer.backend.no.connection",
1681: backend.getName()));
1682:
1683: // Execute Query
1684: try {
1685: rs = preparedStatementGetMetaDataOnBackend(request
1686: .getSqlOrTemplate(), backend, c
1687: .getConnection());
1688: cm.releaseConnectionInAutoCommit(request, c);
1689: } catch (SQLException e) {
1690: cm.releaseConnectionInAutoCommit(request, c);
1691: throw SQLExceptionFactory
1692: .getSQLException(
1693: e,
1694: Translate
1695: .get(
1696: "loadbalancer.request.failed.on.backend",
1697: new String[] {
1698: request
1699: .getSqlShortForm(vdb
1700: .getSqlShortFormLength()),
1701: backend
1702: .getName(),
1703: e
1704: .getMessage() }));
1705: } catch (BadConnectionException e) { // Get rid of the bad connection
1706: cm.deleteConnection(c);
1707: badConnection = true;
1708: } catch (Throwable e) {
1709: cm.releaseConnectionInAutoCommit(request, c);
1710: throw new SQLException(
1711: Translate
1712: .get(
1713: "loadbalancer.request.failed.on.backend",
1714: new String[] {
1715: request
1716: .getSqlShortForm(vdb
1717: .getSqlShortFormLength()),
1718: backend.getName(),
1719: e.getMessage() }));
1720: }
1721: } while (badConnection);
1722: if (logger.isDebugEnabled())
1723: logger.debug(Translate.get("loadbalancer.execute.on",
1724: new String[] { String.valueOf(request.getId()),
1725: backend.getName() }));
1726: return rs;
1727: } else { // Inside a transaction
1728: Connection c;
1729: long tid = request.getTransactionId();
1730:
1731: try {
1732: c = backend
1733: .getConnectionForTransactionAndLazyBeginIfNeeded(
1734: request, cm);
1735: } catch (UnreachableBackendException e1) {
1736: String msg = Translate.get(
1737: "loadbalancer.backend.disabling.unreachable",
1738: backend.getName());
1739: logger.error(msg);
1740: endUserLogger.error(msg);
1741: disableBackend(backend, true);
1742: throw new SQLException(Translate.get(
1743: "loadbalancer.backend.unreacheable", backend
1744: .getName()));
1745: } catch (NoTransactionStartWhenDisablingException e) {
1746: String msg = Translate.get(
1747: "loadbalancer.backend.is.disabling",
1748: new String[] {
1749: request.getSqlShortForm(vdb
1750: .getSqlShortFormLength()),
1751: backend.getName() });
1752: logger.error(msg);
1753: throw new SQLException(msg);
1754: }
1755:
1756: // Sanity check
1757: if (c == null)
1758: throw new SQLException(Translate.get(
1759: "loadbalancer.unable.retrieve.connection",
1760: new String[] { String.valueOf(tid),
1761: backend.getName() }));
1762:
1763: // Execute Query
1764: ControllerResultSet rs = null;
1765: try {
1766: rs = preparedStatementGetMetaDataOnBackend(request
1767: .getSqlOrTemplate(), backend, c);
1768: } catch (SQLException e) {
1769: throw e;
1770: } catch (BadConnectionException e) { // Connection failed, so did the transaction
1771: // Disable the backend.
1772: cm.deleteConnection(tid);
1773: String msg = Translate
1774: .get(
1775: "loadbalancer.backend.disabling.connection.failure",
1776: backend.getName());
1777: logger.error(msg);
1778: endUserLogger.error(msg);
1779: disableBackend(backend, true);
1780: throw new SQLException(msg);
1781: } catch (Throwable e) {
1782: throw new SQLException(Translate.get(
1783: "loadbalancer.request.failed.on.backend",
1784: new String[] {
1785: request.getSqlShortForm(vdb
1786: .getSqlShortFormLength()),
1787: backend.getName(), e.getMessage() }));
1788: }
1789: if (logger.isDebugEnabled())
1790: logger.debug(Translate.get(
1791: "loadbalancer.execute.transaction.on",
1792: new String[] { String.valueOf(tid),
1793: String.valueOf(request.getId()),
1794: backend.getName() }));
1795: return rs;
1796: }
1797: }
1798:
1799: //
1800: // Transaction Management
1801: //
1802:
1803: /**
1804: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1805: */
1806: public void abort(TransactionMetaData tm) throws SQLException {
1807: rollback(tm);
1808: }
1809:
1810: /**
1811: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#begin(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1812: */
1813: public void begin(TransactionMetaData tm) throws SQLException {
1814: Long lTid = new Long(tm.getTransactionId());
1815: if (backendPerTransactionId.containsKey(lTid))
1816: throw new SQLException(Translate.get(
1817: "loadbalancer.transaction.already.started", lTid
1818: .toString()));
1819:
1820: DatabaseBackend backend = chooseBackendForReadRequest(new UnknownReadRequest(
1821: "begin", false, 0, "\n"));
1822: backendPerTransactionId.put(lTid, backend);
1823: backend.startTransaction(lTid);
1824: }
1825:
1826: /**
1827: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#commit(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1828: */
1829: public void commit(TransactionMetaData tm) throws SQLException {
1830: long tid = tm.getTransactionId();
1831: Long lTid = new Long(tid);
1832: DatabaseBackend db = (DatabaseBackend) backendPerTransactionId
1833: .remove(lTid);
1834:
1835: AbstractConnectionManager cm = db.getConnectionManager(tm
1836: .getLogin());
1837: PooledConnection pc = cm.retrieveConnectionForTransaction(tid);
1838:
1839: long logId = 0;
1840: // Log the request
1841: if (recoveryLog != null)
1842: logId = recoveryLog.logCommit(tm);
1843:
1844: // Sanity check
1845: if (pc == null) { // Bad connection
1846: db.stopTransaction(lTid);
1847:
1848: throw new SQLException(Translate.get(
1849: "loadbalancer.unable.retrieve.connection",
1850: new String[] { String.valueOf(tid), db.getName() }));
1851: }
1852:
1853: // Execute Query
1854: try {
1855: Connection c = pc.getConnection();
1856: c.commit();
1857: c.setAutoCommit(true);
1858: } catch (Exception e) {
1859: // Notify failure in recovery log
1860: if (recoveryLog != null)
1861: recoveryLog.logRequestCompletion(logId, false, 0);
1862:
1863: String msg = Translate.get("loadbalancer.commit.failed",
1864: new String[] { String.valueOf(tid), db.getName(),
1865: e.getMessage() });
1866: logger.error(msg);
1867: throw new SQLException(msg);
1868: } finally {
1869: cm.releaseConnectionForTransaction(tid);
1870: db.stopTransaction(lTid);
1871: }
1872: }
1873:
1874: /**
1875: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#rollback(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1876: */
1877: public void rollback(TransactionMetaData tm) throws SQLException {
1878: long tid = tm.getTransactionId();
1879: Long lTid = new Long(tid);
1880: DatabaseBackend db = (DatabaseBackend) backendPerTransactionId
1881: .remove(lTid);
1882:
1883: AbstractConnectionManager cm = db.getConnectionManager(tm
1884: .getLogin());
1885: PooledConnection pc = cm.retrieveConnectionForTransaction(tid);
1886:
1887: long logId = 0;
1888: // Log the request
1889: if (recoveryLog != null)
1890: logId = recoveryLog.logRollback(tm);
1891:
1892: // Sanity check
1893: if (pc == null) { // Bad connection
1894: db.stopTransaction(lTid);
1895:
1896: throw new SQLException(Translate.get(
1897: "loadbalancer.unable.retrieve.connection",
1898: new String[] { String.valueOf(tid), db.getName() }));
1899: }
1900:
1901: // Execute Query
1902: try {
1903: Connection c = pc.getConnection();
1904: c.rollback();
1905:
1906: c.setAutoCommit(true);
1907: } catch (Exception e) {
1908: // Notify failure in recovery log
1909: if (recoveryLog != null)
1910: recoveryLog.logRequestCompletion(logId, false, 0);
1911:
1912: String msg = Translate.get("loadbalancer.rollback.failed",
1913: new String[] { String.valueOf(tid), db.getName(),
1914: e.getMessage() });
1915: logger.error(msg);
1916: throw new SQLException(msg);
1917: } finally {
1918: cm.releaseConnectionForTransaction(tid);
1919: db.stopTransaction(lTid);
1920: }
1921: }
1922:
1923: /**
1924: * Rollback a transaction to a savepoint
1925: *
1926: * @param tm The transaction marker metadata
1927: * @param savepointName The name of the savepoint
1928: * @throws SQLException if an error occurs
1929: */
1930: public void rollbackToSavepoint(TransactionMetaData tm,
1931: String savepointName) throws SQLException {
1932: long tid = tm.getTransactionId();
1933: Long lTid = new Long(tid);
1934: DatabaseBackend db = (DatabaseBackend) backendPerTransactionId
1935: .remove(lTid);
1936:
1937: AbstractConnectionManager cm = db.getConnectionManager(tm
1938: .getLogin());
1939: PooledConnection c = cm.retrieveConnectionForTransaction(tid);
1940:
1941: long logId = 0;
1942: // Log the request
1943: if (recoveryLog != null)
1944: logId = recoveryLog.logRollbackToSavepoint(tm,
1945: savepointName);
1946:
1947: // Sanity check
1948: if (c == null) { // Bad connection
1949: db.stopTransaction(lTid);
1950:
1951: throw new SQLException(Translate.get(
1952: "loadbalancer.unable.retrieve.connection",
1953: new String[] { String.valueOf(tid), db.getName() }));
1954: }
1955:
1956: // Retrieve savepoint
1957: Savepoint savepoint = db.getSavepoint(lTid, savepointName);
1958: if (savepoint == null) {
1959: throw new SQLException(Translate.get(
1960: "loadbalancer.unable.retrieve.savepoint",
1961: new String[] { savepointName, String.valueOf(tid),
1962: db.getName() }));
1963: }
1964:
1965: // Execute Query
1966: try {
1967: c.getConnection().rollback(savepoint);
1968: } catch (Exception e) {
1969: // Notify failure in recovery log
1970: if (recoveryLog != null)
1971: recoveryLog.logRequestCompletion(logId, false, 0);
1972:
1973: String msg = Translate.get(
1974: "loadbalancer.rollbacksavepoint.failed",
1975: new String[] { savepointName, String.valueOf(tid),
1976: db.getName(), e.getMessage() });
1977: logger.error(msg);
1978: throw new SQLException(msg);
1979: }
1980: }
1981:
1982: /**
1983: * Release a savepoint from a transaction
1984: *
1985: * @param tm The transaction marker metadata
1986: * @param savepointName The name of the savepoint ro release
1987: * @throws SQLException if an error occurs
1988: */
1989: public void releaseSavepoint(TransactionMetaData tm,
1990: String savepointName) throws SQLException {
1991: long tid = tm.getTransactionId();
1992: Long lTid = new Long(tid);
1993:
1994: DatabaseBackend db = (DatabaseBackend) backendPerTransactionId
1995: .get(lTid);
1996: AbstractConnectionManager cm = db.getConnectionManager(tm
1997: .getLogin());
1998: PooledConnection c = cm.retrieveConnectionForTransaction(tid);
1999:
2000: long logId = 0;
2001: // Log the request
2002: if (recoveryLog != null)
2003: logId = recoveryLog.logReleaseSavepoint(tm, savepointName);
2004:
2005: // Sanity check
2006: if (c == null) { // Bad connection
2007: db.stopTransaction(lTid);
2008:
2009: throw new SQLException(Translate.get(
2010: "loadbalancer.unable.retrieve.connection",
2011: new String[] { String.valueOf(tid), db.getName() }));
2012: }
2013:
2014: // Retrieve savepoint
2015: Savepoint savepoint = db.getSavepoint(lTid, savepointName);
2016: if (savepoint == null) {
2017: throw new SQLException(Translate.get(
2018: "loadbalancer.unable.retrieve.savepoint",
2019: new String[] { String.valueOf(tid), savepointName,
2020: db.getName() }));
2021: }
2022:
2023: // Execute Query
2024: try {
2025: c.getConnection().releaseSavepoint(savepoint);
2026: } catch (Exception e) {
2027: // Notify failure in recovery log
2028: if (recoveryLog != null)
2029: recoveryLog.logRequestCompletion(logId, false, 0);
2030:
2031: String msg = Translate.get(
2032: "loadbalancer.releasesavepoint.failed",
2033: new String[] { savepointName, String.valueOf(tid),
2034: db.getName(), e.getMessage() });
2035: logger.error(msg);
2036: throw new SQLException(msg);
2037: } finally {
2038: db.removeSavepoint(lTid, savepoint);
2039: }
2040: }
2041:
2042: /**
2043: * Set a savepoint to a transaction.
2044: *
2045: * @param tm The transaction marker metadata
2046: * @param savepointName The name of the new savepoint
2047: * @throws SQLException if an error occurs
2048: */
2049: public void setSavepoint(TransactionMetaData tm,
2050: String savepointName) throws SQLException {
2051: long tid = tm.getTransactionId();
2052: Long lTid = new Long(tid);
2053:
2054: DatabaseBackend db = (DatabaseBackend) backendPerTransactionId
2055: .get(lTid);
2056: AbstractConnectionManager cm = db.getConnectionManager(tm
2057: .getLogin());
2058: PooledConnection c = cm.retrieveConnectionForTransaction(tid);
2059:
2060: long logId = 0;
2061: // Log the request
2062: if (recoveryLog != null)
2063: logId = recoveryLog.logSetSavepoint(tm, savepointName);
2064:
2065: // Sanity check
2066: if (c == null) { // Bad connection
2067: db.stopTransaction(lTid);
2068:
2069: throw new SQLException(Translate.get(
2070: "loadbalancer.unable.retrieve.connection",
2071: new String[] { String.valueOf(tid), db.getName() }));
2072: }
2073:
2074: // Execute Query
2075: Savepoint savepoint = null;
2076: try {
2077: savepoint = c.getConnection().setSavepoint(savepointName);
2078: } catch (Exception e) {
2079: // Notify failure in recovery log
2080: if (recoveryLog != null)
2081: recoveryLog.logRequestCompletion(logId, false, 0);
2082:
2083: String msg = Translate.get(
2084: "loadbalancer.setsavepoint.failed", new String[] {
2085: savepointName, String.valueOf(tid),
2086: db.getName(), e.getMessage() });
2087: logger.error(msg);
2088: throw new SQLException(msg);
2089: } finally {
2090: if (savepoint != null)
2091: db.addSavepoint(lTid, savepoint);
2092: }
2093: }
2094:
2095: /**
2096: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
2097: * long)
2098: */
2099: public void closePersistentConnection(String login,
2100: long persistentConnectionId) throws SQLException {
2101: try {
2102: vdb.acquireReadLockBackendLists();
2103: } catch (InterruptedException e) {
2104: String msg = Translate.get(
2105: "loadbalancer.backendlist.acquire.readlock.failed",
2106: e);
2107: logger.error(msg);
2108: }
2109: int size = vdb.getBackends().size();
2110: List backends = vdb.getBackends();
2111: for (int i = 0; i < size; i++) {
2112: DatabaseBackend backend = (DatabaseBackend) backends.get(i);
2113: AbstractConnectionManager cm = backend
2114: .getConnectionManager(login);
2115: if (cm != null) {
2116: cm
2117: .releasePersistentConnectionInAutoCommit(persistentConnectionId);
2118: backend
2119: .removePersistentConnection(persistentConnectionId);
2120: }
2121: }
2122: vdb.releaseReadLockBackendLists();
2123: }
2124:
2125: /**
2126: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
2127: * long)
2128: */
2129: public void openPersistentConnection(String login,
2130: long persistentConnectionId) throws SQLException {
2131: // Fake request to call the method that creates persistent connections
2132: AbstractRequest request = new UnknownReadRequest("", false, 0,
2133: "");
2134: request.setLogin(login);
2135: request.setPersistentConnection(true);
2136: request.setPersistentConnectionId(persistentConnectionId);
2137:
2138: try {
2139: vdb.acquireReadLockBackendLists();
2140: } catch (InterruptedException e) {
2141: String msg = Translate.get(
2142: "loadbalancer.backendlist.acquire.readlock.failed",
2143: e);
2144: logger.error(msg);
2145: }
2146: int size = vdb.getBackends().size();
2147: List backends = vdb.getBackends();
2148: for (int i = 0; i < size; i++) {
2149: DatabaseBackend backend = (DatabaseBackend) backends.get(i);
2150: AbstractConnectionManager cm = backend
2151: .getConnectionManager(login);
2152: if (cm == null) {
2153: logger.warn("Failed to open persistent connection "
2154: + persistentConnectionId + " on backend "
2155: + backend.getName());
2156: continue;
2157: }
2158: try {
2159: // Create the persistent connection
2160: PooledConnection c = cm
2161: .retrieveConnectionInAutoCommit(request);
2162: backend.addPersistentConnection(request
2163: .getPersistentConnectionId(), c);
2164: } catch (UnreachableBackendException e) {
2165: logger.warn("Failed to open persistent connection "
2166: + persistentConnectionId + " on backend "
2167: + backend.getName(), e);
2168: }
2169: }
2170: vdb.releaseReadLockBackendLists();
2171: }
2172:
2173: /**
2174: * Enables a backend that was previously disabled. Asks the corresponding
2175: * connection manager to initialize the connections if needed.
2176: * <p>
2177: * No sanity checks are performed by this function.
2178: *
2179: * @param db the database backend to enable
2180: * @param writeEnabled True if the backend must be enabled for writes
2181: * @throws SQLException if an error occurs
2182: */
2183: public void enableBackend(DatabaseBackend db, boolean writeEnabled)
2184: throws SQLException {
2185: logger.info(Translate.get("loadbalancer.backend.enabling", db
2186: .getName()));
2187: if (!db.isInitialized())
2188: db.initializeConnections();
2189: db.enableRead();
2190: if (writeEnabled)
2191: db.enableWrite();
2192: numberOfEnabledBackends++;
2193: }
2194:
2195: /**
2196: * Disables a backend that was previously enabled. Asks the corresponding
2197: * connection manager to finalize the connections if needed.
2198: * <p>
2199: * No sanity checks are performed by this function.
2200: *
2201: * @param db the database backend to disable
2202: * @param forceDisable true if disabling must be forced on the backend
2203: * @throws SQLException if an error occurs
2204: */
2205: public void disableBackend(DatabaseBackend db, boolean forceDisable)
2206: throws SQLException {
2207: logger.info(Translate.get("loadbalancer.backend.disabling", db
2208: .getName()));
2209: numberOfEnabledBackends--;
2210: db.disable();
2211: if (db.isInitialized())
2212: db.finalizeConnections();
2213: }
2214:
2215: /**
2216: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getNumberOfEnabledBackends()
2217: */
2218: public int getNumberOfEnabledBackends() {
2219: return numberOfEnabledBackends;
2220: }
2221:
2222: //
2223: // Debug/Monitoring
2224: //
2225:
2226: /**
2227: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2228: */
2229: public String getXmlImpl() {
2230: StringBuffer info = new StringBuffer();
2231: info.append("<" + DatabasesXmlTags.ELT_ParallelDB + ">");
2232: info.append(getParallelDBXml());
2233: info.append("</" + DatabasesXmlTags.ELT_ParallelDB + ">");
2234: return info.toString();
2235: }
2236:
2237: /**
2238: * Return the XML tags of the ParallelDB load balancer implementation.
2239: *
2240: * @return content of ParallelDB xml
2241: */
2242: public abstract String getParallelDBXml();
2243:
2244: }
|