0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Vadim Kassin, Jaco Swart, Jean-Bernard van Zuylen
0023: */package org.continuent.sequoia.controller.loadbalancer;
0024:
0025: import java.sql.CallableStatement;
0026: import java.sql.Connection;
0027: import java.sql.PreparedStatement;
0028: import java.sql.ResultSet;
0029: import java.sql.SQLException;
0030: import java.sql.SQLWarning;
0031: import java.sql.Statement;
0032: import java.util.ArrayList;
0033: import java.util.Iterator;
0034: import java.util.LinkedList;
0035: import java.util.List;
0036:
0037: import org.continuent.sequoia.common.exceptions.BadConnectionException;
0038: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0039: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0040: import org.continuent.sequoia.common.i18n.Translate;
0041: import org.continuent.sequoia.common.locks.ReadPrioritaryFIFOWriteLock;
0042: import org.continuent.sequoia.common.log.Trace;
0043: import org.continuent.sequoia.common.protocol.PreparedStatementSerialization;
0044: import org.continuent.sequoia.common.sql.filters.MacrosHandler;
0045: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0046: import org.continuent.sequoia.common.xml.XmlComponent;
0047: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0048: import org.continuent.sequoia.controller.backend.DriverCompliance;
0049: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0050: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0051: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0052: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0053: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0054: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0055: import org.continuent.sequoia.controller.connection.PooledConnection;
0056: import org.continuent.sequoia.controller.core.ControllerConstants;
0057: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0058: import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0059: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0060: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0061: import org.continuent.sequoia.controller.requests.AbstractRequest;
0062: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0063: import org.continuent.sequoia.controller.requests.CreateRequest;
0064: import org.continuent.sequoia.controller.requests.SelectRequest;
0065: import org.continuent.sequoia.controller.requests.StoredProcedure;
0066: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0067: import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendWritesMessage;
0068:
0069: /**
0070: * The Request Load Balancer should implement the load balancing of the requests
0071: * among the backend nodes.
0072: * <p>
0073: * The requests comes from the Request Controller and are sent to the Connection
0074: * Managers.
0075: *
0076: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0077: * @author <a href="mailto:vadim@kase.kz">Vadim Kassin </a>
0078: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
0079: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0080: * </a>
0081: * @version 1.0
0082: */
0083: public abstract class AbstractLoadBalancer implements XmlComponent {
0084:
0085: //
0086: // How the code is organized ?
0087: //
0088: // 1. Member variables/Constructor
0089: // 2. Getter/Setter (possibly in alphabetical order)
0090: // 3. Request handling
0091: // 4. Transaction management
0092: // 5. Backend management
0093: // 6. Debug/Monitoring
0094: //
0095:
0096: // Virtual Database this load balancer is attached to.
0097: protected VirtualDatabase vdb;
0098: protected RecoveryLog recoveryLog;
0099: protected int raidbLevel;
0100: protected int parsingGranularity;
0101: /** Reference to distributed virtual database total order queue */
0102: protected LinkedList totalOrderQueue;
0103:
0104: protected MacrosHandler macroHandler;
0105:
0106: /**
0107: * List of enabled backends (includes backends in either ENABLED or DISABLING
0108: * state).
0109: *
0110: * @see org.continuent.sequoia.common.jmx.management.BackendState
0111: */
0112: protected ArrayList enabledBackends;
0113: protected ReadPrioritaryFIFOWriteLock backendListLock = new ReadPrioritaryFIFOWriteLock();
0114:
0115: /** Should we wait for all backends to commit before returning ? */
0116: public WaitForCompletionPolicy waitForCompletionPolicy;
0117:
0118: private static int defaultTransactionIsolationLevel;
0119:
0120: protected static Trace logger = Trace
0121: .getLogger("org.continuent.sequoia.controller.loadbalancer");
0122:
0123: /**
0124: * Generic constructor that sets some member variables and checks that
0125: * backends are in the disabled state
0126: *
0127: * @param vdb The virtual database this load balancer belongs to
0128: * @param raidbLevel The RAIDb level of this load balancer
0129: * @param parsingGranularity The parsing granularity needed by this load
0130: * balancer
0131: */
0132: protected AbstractLoadBalancer(VirtualDatabase vdb, int raidbLevel,
0133: int parsingGranularity) throws SQLException {
0134: this .raidbLevel = raidbLevel;
0135: this .parsingGranularity = parsingGranularity;
0136: this .vdb = vdb;
0137: this .totalOrderQueue = vdb.getTotalOrderQueue();
0138: this .enabledBackends = new ArrayList();
0139: try {
0140: vdb.acquireReadLockBackendLists();
0141: } catch (InterruptedException e) {
0142: String msg = Translate.get(
0143: "loadbalancer.backendlist.acquire.readlock.failed",
0144: e);
0145: logger.error(msg);
0146: throw new SQLException(msg);
0147: }
0148: int size = vdb.getBackends().size();
0149: ArrayList backends = vdb.getBackends();
0150: for (int i = 0; i < size; i++) {
0151: DatabaseBackend backend = (DatabaseBackend) backends.get(i);
0152: if (backend.isReadEnabled() || backend.isWriteEnabled()) {
0153: if (logger.isWarnEnabled())
0154: logger
0155: .warn(Translate
0156: .get(
0157: "loadbalancer.constructor.backends.not.disabled",
0158: backend.getName()));
0159: try {
0160: disableBackend(backend, true);
0161: } catch (Exception e) { // Set the disabled state anyway
0162: backend.disable();
0163: }
0164: }
0165: }
0166: vdb.releaseReadLockBackendLists();
0167: }
0168:
0169: //
0170: // Getter/Setter methods
0171: //
0172:
0173: /**
0174: * Returns the defaultTransactionIsolationLevel value.
0175: *
0176: * @return Returns the defaultTransactionIsolationLevel.
0177: */
0178: public static final int getDefaultTransactionIsolationLevel() {
0179: return defaultTransactionIsolationLevel;
0180: }
0181:
0182: /**
0183: * Sets the defaultTransactionIsolationLevel value.
0184: *
0185: * @param defaultTransactionIsolationLevel The
0186: * defaultTransactionIsolationLevel to set.
0187: */
0188: public final void setDefaultTransactionIsolationLevel(
0189: int defaultTransactionIsolationLevel) {
0190: AbstractLoadBalancer.defaultTransactionIsolationLevel = defaultTransactionIsolationLevel;
0191: }
0192:
0193: /**
0194: * This sets the macro handler for this load balancer. Handling macros
0195: * prevents different backends to generate different values when interpreting
0196: * the macros which could result in data inconsitencies.
0197: *
0198: * @param handler <code>MacrosHandler</code> instance
0199: */
0200: public void setMacroHandler(MacrosHandler handler) {
0201: this .macroHandler = handler;
0202: }
0203:
0204: /**
0205: * Get the needed query parsing granularity.
0206: *
0207: * @return needed query parsing granularity
0208: */
0209: public int getParsingGranularity() {
0210: return parsingGranularity;
0211: }
0212:
0213: /**
0214: * Returns the RAIDbLevel.
0215: *
0216: * @return int the RAIDb level
0217: */
0218: public int getRAIDbLevel() {
0219: return raidbLevel;
0220: }
0221:
0222: /**
0223: * Returns the recoveryLog value.
0224: *
0225: * @return Returns the recoveryLog.
0226: */
0227: public final RecoveryLog getRecoveryLog() {
0228: return recoveryLog;
0229: }
0230:
0231: /**
0232: * Sets the recoveryLog value.
0233: *
0234: * @param recoveryLog The recoveryLog to set.
0235: */
0236: public final void setRecoveryLog(RecoveryLog recoveryLog) {
0237: this .recoveryLog = recoveryLog;
0238: }
0239:
0240: /**
0241: * Associate a weight to a backend identified by its logical name.
0242: *
0243: * @param name the backend name
0244: * @param w the weight
0245: * @throws SQLException if an error occurs
0246: */
0247: public void setWeight(String name, int w) throws SQLException {
0248: throw new SQLException(
0249: "Weight is not supported by this load balancer");
0250: }
0251:
0252: //
0253: // Utility functions
0254: //
0255:
0256: /**
0257: * Acquire the given lock and check the number of threads. Throw a
0258: * NoMoreBackendException if no thread is available else returns the number of
0259: * threads.
0260: *
0261: * @param request object to remove from the total order queue in case no
0262: * backend is available
0263: * @param requestDescription description of the request to put in the error
0264: * message in case of an error
0265: * @return the number of threads in the acquired list
0266: * @throws SQLException if there was a problem to acquire the lock on the
0267: * enabled backend list
0268: * @throws NoMoreBackendException if no backends are available anymore
0269: */
0270: protected int acquireLockAndCheckNbOfThreads(Object request,
0271: String requestDescription) throws SQLException,
0272: NoMoreBackendException {
0273: try {
0274: backendListLock.acquireRead();
0275: } catch (InterruptedException e) {
0276: String msg = Translate.get(
0277: "loadbalancer.backendlist.acquire.readlock.failed",
0278: e);
0279: logger.error(msg);
0280: throw new SQLException(msg);
0281: }
0282:
0283: int nbOfThreads = enabledBackends.size();
0284: if (nbOfThreads == 0) {
0285: releaseLockAndUnlockNextQuery(request);
0286: throw new NoMoreBackendException(Translate
0287: .get("loadbalancer.backendlist.empty"));
0288: } else {
0289: if (logger.isDebugEnabled())
0290: logger.debug(Translate.get(
0291: "loadbalancer.execute.on.several",
0292: new String[] { requestDescription,
0293: String.valueOf(nbOfThreads) }));
0294: }
0295: return nbOfThreads;
0296: }
0297:
0298: /**
0299: * Returns the number of nodes to wait for according to the defined
0300: * <code>waitForCompletion</code> policy.
0301: *
0302: * @param nbOfThreads total number of threads
0303: * @return int number of threads to wait for
0304: */
0305: protected int getNbToWait(int nbOfThreads) {
0306: int nbToWait;
0307: switch (waitForCompletionPolicy.getPolicy()) {
0308: case WaitForCompletionPolicy.FIRST:
0309: nbToWait = 1;
0310: break;
0311: case WaitForCompletionPolicy.MAJORITY:
0312: nbToWait = nbOfThreads / 2 + 1;
0313: break;
0314: case WaitForCompletionPolicy.ALL:
0315: nbToWait = nbOfThreads;
0316: break;
0317: default:
0318: logger.warn(Translate
0319: .get("loadbalancer.waitforcompletion.unsupported"));
0320: nbToWait = nbOfThreads;
0321: break;
0322: }
0323: return nbToWait;
0324: }
0325:
0326: /**
0327: * Interprets the macros in the request (depending on the
0328: * <code>MacroHandler</code> set for this class) and modify either the
0329: * skeleton or the query itself. Note that the given object is directly
0330: * modified.
0331: *
0332: * @param request the request to process
0333: */
0334: public void handleMacros(AbstractRequest request) {
0335: if (macroHandler == null)
0336: return;
0337:
0338: // Do not handle macros for requests that don't need it.
0339: if (!request.needsMacroProcessing())
0340: return;
0341:
0342: macroHandler.processMacros(request);
0343: }
0344:
0345: /**
0346: * Release the backend list lock and remove the current query from the head of
0347: * the total order queue to unlock the next query.
0348: *
0349: * @param currentQuery the current query to remove from the total order queue
0350: */
0351: protected void releaseLockAndUnlockNextQuery(Object currentQuery) {
0352: backendListLock.releaseRead();
0353:
0354: // Unblock next query from total order queue
0355: removeObjectFromAndNotifyTotalOrderQueue(currentQuery);
0356: }
0357:
0358: /**
0359: * Remove an entry of the total order queue (usually the head) and notify the
0360: * queue so that the next queries can be scheduled.
0361: *
0362: * @param request Object that should be removed from the total order queue
0363: */
0364: public void removeObjectFromAndNotifyTotalOrderQueue(Object request) {
0365: if ((totalOrderQueue != null) && (request != null)) {
0366: synchronized (totalOrderQueue) {
0367: try {
0368: if (totalOrderQueue.remove(request)) {
0369: if (logger.isDebugEnabled())
0370: logger.debug("Removed " + request
0371: + " from total order queue");
0372: totalOrderQueue.notifyAll();
0373: } else if (logger.isDebugEnabled()) {
0374: logger.debug(request
0375: + " was not in the total order queue");
0376: }
0377: } catch (RuntimeException e) {
0378: logger.warn("Unable to remove request " + request
0379: + " from total order queue", e);
0380: }
0381: }
0382: }
0383: }
0384:
0385: /**
0386: * Wait for the completion of the given task. Note that this method must be
0387: * called within a synchronized block on the task.
0388: *
0389: * @param timeout timeout in ms for this task
0390: * @param requestDescription description of the request to put in the error
0391: * message in case of a timeout
0392: * @param task the task to wait for completion
0393: * @throws SQLException if the timeout has expired
0394: */
0395: public static void waitForTaskCompletion(long timeout,
0396: String requestDescription, AbstractTask task)
0397: throws SQLException {
0398: // Wait for completion (notified by the task)
0399: try {
0400: // Wait on task
0401: if (timeout > 0) {
0402: long start = System.currentTimeMillis();
0403: task.wait(timeout);
0404: long end = System.currentTimeMillis();
0405: long remaining = timeout - (end - start);
0406: if (remaining <= 0) {
0407: if (task.setExpiredTimeout()) { // Task will be ignored by all backends
0408: String msg = Translate
0409: .get("loadbalancer.request.timeout",
0410: new String[] {
0411: requestDescription,
0412: String.valueOf(task
0413: .getSuccess()),
0414: String.valueOf(task
0415: .getFailed()) });
0416:
0417: logger.warn(msg);
0418: throw new SQLException(msg);
0419: }
0420: // else task execution already started, to late to cancel
0421: }
0422: // No need to update request timeout since the execution is finished
0423: } else
0424: task.wait();
0425: } catch (InterruptedException e) {
0426: if (task.setExpiredTimeout()) { // Task will be ignored by all backends
0427: String msg = Translate.get(
0428: "loadbalancer.request.timeout", new String[] {
0429: requestDescription,
0430: String.valueOf(task.getSuccess()),
0431: String.valueOf(task.getFailed()) });
0432:
0433: logger.warn(msg);
0434: throw new SQLException(msg);
0435: }
0436: // else task execution already started, to late to cancel
0437: }
0438: }
0439:
0440: /**
0441: * If we are executing in a distributed virtual database, we have to make sure
0442: * that we post the query in the queue following the total order. This method
0443: * does not remove the request from the total order queue. You have to call
0444: * removeHeadFromAndNotifyTotalOrderQueue() to do so.
0445: *
0446: * @param request the request to wait for (can be any object but usually a
0447: * DistributedRequest, Commit or Rollback)
0448: * @param errorIfNotFound true if an error message should be logged if the
0449: * request is not found in the total order queue
0450: * @return true if the element was found and wait has succeeded, false
0451: * otherwise
0452: * @see #removeHeadFromAndNotifyTotalOrderQueue(Object)
0453: */
0454: public boolean waitForTotalOrder(Object request,
0455: boolean errorIfNotFound) {
0456: if (totalOrderQueue != null) {
0457: synchronized (totalOrderQueue) {
0458: int index = totalOrderQueue.indexOf(request);
0459: while (index > 0) {
0460: if (logger.isDebugEnabled())
0461: logger.debug("Waiting for " + index
0462: + " queries to execute (current is "
0463: + totalOrderQueue.get(0) + ")");
0464:
0465: // All suspended requests can be bypassed
0466: boolean foundNonSuspendedRequest = false;
0467: for (int i = 0; i < index; i++) {
0468: if (!vdb.getRequestManager().getScheduler()
0469: .isSuspendedRequest(
0470: totalOrderQueue.get(i))) {
0471: foundNonSuspendedRequest = true;
0472: break;
0473: }
0474: }
0475: if (!foundNonSuspendedRequest) {
0476: index = 0;
0477: break;
0478: }
0479:
0480: try {
0481: totalOrderQueue.wait();
0482: } catch (InterruptedException ignore) {
0483: }
0484: index = totalOrderQueue.indexOf(request);
0485: }
0486: if (index == -1) {
0487: if (errorIfNotFound)
0488: logger
0489: .error("Request was not found in total order queue, posting out of order ("
0490: + request + ")");
0491: return false;
0492: } else
0493: return true;
0494: }
0495: }
0496: return false;
0497: }
0498:
0499: /**
0500: * This will block the given request if there are any suspending task in
0501: * progress (before the request in the total order queue).
0502: *
0503: * @param request the request that we are processing
0504: */
0505: public void waitForSuspendWritesToComplete(AbstractRequest request) {
0506: if (totalOrderQueue != null) {
0507: synchronized (totalOrderQueue) {
0508: boolean hasToWait = true;
0509: while (hasToWait) {
0510: hasToWait = false;
0511: // Checking total order queue to see if there is an
0512: // SuspendWritesMessage before this request.
0513: // If this is the case, this request will have to wait.
0514: for (Iterator iter = totalOrderQueue.iterator(); iter
0515: .hasNext();) {
0516: Object elem = iter.next();
0517: if (elem instanceof SuspendWritesMessage) {
0518: // Found a SuspendWritesMessage, so wait...
0519: hasToWait = true;
0520: break;
0521: } else if (elem instanceof AbstractRequest) {
0522: // Found the request itself, let'go then...
0523: AbstractRequest req = (AbstractRequest) elem;
0524: if (req == request)
0525: break;
0526: }
0527: }
0528: if (hasToWait)
0529: try {
0530: totalOrderQueue.wait();
0531: } catch (InterruptedException ignore) {
0532: }
0533: }
0534: }
0535: }
0536: }
0537:
0538: //
0539: // Request Handling
0540: //
0541:
0542: /**
0543: * Perform a read request. It is up to the implementation to choose to which
0544: * backend node(s) this request should be sent.
0545: *
0546: * @param request an <code>SelectRequest</code>
0547: * @param metadataCache MetadataCache (null if none)
0548: * @return the corresponding <code>ControllerResultSet</code>
0549: * @exception SQLException if an error occurs
0550: * @throws AllBackendsFailedException if all backends failed to execute the
0551: * request
0552: */
0553: public abstract ControllerResultSet statementExecuteQuery(
0554: SelectRequest request, MetadataCache metadataCache)
0555: throws SQLException, AllBackendsFailedException;
0556:
0557: /**
0558: * Perform a write request. This request should usually be broadcasted to all
0559: * nodes.
0560: *
0561: * @param request an <code>AbstractWriteRequest</code>
0562: * @return number of rows affected by the request
0563: * @throws AllBackendsFailedException if all backends failed to execute the
0564: * request
0565: * @exception NoMoreBackendException if no backends are left to execute the
0566: * request
0567: * @exception SQLException if an error occurs
0568: */
0569: public abstract ExecuteUpdateResult statementExecuteUpdate(
0570: AbstractWriteRequest request)
0571: throws AllBackendsFailedException, NoMoreBackendException,
0572: SQLException;
0573:
0574: /**
0575: * Perform a write request and return a ResultSet containing the auto
0576: * generated keys.
0577: *
0578: * @param request an <code>AbstractWriteRequest</code>
0579: * @param metadataCache MetadataCache (null if none)
0580: * @return auto generated keys
0581: * @throws AllBackendsFailedException if all backends failed to execute the
0582: * request
0583: * @exception NoMoreBackendException if no backends are left to execute the
0584: * request
0585: * @exception SQLException if an error occurs
0586: */
0587: public abstract GeneratedKeysResult statementExecuteUpdateWithKeys(
0588: AbstractWriteRequest request, MetadataCache metadataCache)
0589: throws AllBackendsFailedException, NoMoreBackendException,
0590: SQLException;
0591:
0592: /**
0593: * Call a request that returns multiple results.
0594: *
0595: * @param request the request to execute
0596: * @param metadataCache MetadataCache (null if none)
0597: * @return an <code>ExecuteResult</code> object
0598: * @throws AllBackendsFailedException if all backends failed to execute the
0599: * request
0600: * @throws SQLException if an error occurs
0601: */
0602: public abstract ExecuteResult statementExecute(
0603: AbstractRequest request, MetadataCache metadataCache)
0604: throws AllBackendsFailedException, SQLException;
0605:
0606: /**
0607: * Call a read-only stored procedure that returns a ResultSet. The stored
0608: * procedure will be executed by one node only.
0609: *
0610: * @param proc the stored procedure call
0611: * @param metadataCache MetadataCache (null if none)
0612: * @return a <code>ControllerResultSet</code> value
0613: * @exception SQLException if an error occurs
0614: */
0615: public abstract ControllerResultSet readOnlyCallableStatementExecuteQuery(
0616: StoredProcedure proc, MetadataCache metadataCache)
0617: throws SQLException;
0618:
0619: /**
0620: * Call a read-only stored procedure that returns multiple results. The stored
0621: * procedure will be executed by one node only.
0622: *
0623: * @param proc the stored procedure call
0624: * @param metadataCache MetadataCache (null if none)
0625: * @return a <code>ExecuteResult</code> object containing all results
0626: * @exception SQLException if an error occurs
0627: */
0628: public abstract ExecuteResult readOnlyCallableStatementExecute(
0629: StoredProcedure proc, MetadataCache metadataCache)
0630: throws SQLException;
0631:
0632: /**
0633: * Call a stored procedure that returns a ResultSet. This stored procedure can
0634: * possibly perform writes and will therefore be executed by all nodes.
0635: *
0636: * @param proc the stored procedure call
0637: * @param metadataCache MetadataCache (null if none)
0638: * @return a <code>ControllerResultSet</code> value
0639: * @throws AllBackendsFailedException if all backends failed to execute the
0640: * request
0641: * @exception SQLException if an error occurs
0642: */
0643: public abstract ControllerResultSet callableStatementExecuteQuery(
0644: StoredProcedure proc, MetadataCache metadataCache)
0645: throws AllBackendsFailedException, SQLException;
0646:
0647: /**
0648: * Call a stored procedure that performs an update.
0649: *
0650: * @param proc the stored procedure call
0651: * @return number of rows affected
0652: * @throws AllBackendsFailedException if all backends failed to execute the
0653: * request
0654: * @throws SQLException if an error occurs
0655: */
0656: public abstract ExecuteUpdateResult callableStatementExecuteUpdate(
0657: StoredProcedure proc) throws AllBackendsFailedException,
0658: SQLException;
0659:
0660: /**
0661: * Call a stored procedure that returns multiple results.
0662: *
0663: * @param proc the stored procedure call
0664: * @param metadataCache MetadataCache (null if none)
0665: * @return an <code>ExecuteResult</code> object
0666: * @throws AllBackendsFailedException if all backends failed to execute the
0667: * request
0668: * @throws SQLException if an error occurs
0669: */
0670: public abstract ExecuteResult callableStatementExecute(
0671: StoredProcedure proc, MetadataCache metadataCache)
0672: throws AllBackendsFailedException, SQLException;
0673:
0674: /**
0675: * Return a ControllerResultSet containing the PreparedStatement metaData of
0676: * the given sql template
0677: *
0678: * @param request the request containing the sql template
0679: * @return an empty ControllerResultSet with the metadata
0680: * @throws SQLException if a database error occurs
0681: */
0682: public abstract ControllerResultSet getPreparedStatementGetMetaData(
0683: AbstractRequest request) throws SQLException;
0684:
0685: /**
0686: * Setup a Statement or a PreparedStatement (decoded if parameters of the
0687: * request are not null).
0688: */
0689: private static Statement setupStatementOrPreparedStatement(
0690: AbstractRequest request, DatabaseBackend backend,
0691: BackendWorkerThread workerThread, Connection c,
0692: boolean setupResultSetParameters, boolean needGeneratedKeys)
0693: throws SQLException {
0694: Statement s; // Can also be used as a PreparedStatement
0695: if (request.getPreparedStatementParameters() == null)
0696: s = c.createStatement();
0697: else {
0698: String rewrittenTemplate = backend.rewriteQuery(request
0699: .getSqlOrTemplate());
0700: if (needGeneratedKeys)
0701: s = c.prepareStatement(rewrittenTemplate,
0702: Statement.RETURN_GENERATED_KEYS);
0703: else
0704: s = c.prepareStatement(rewrittenTemplate);
0705: PreparedStatementSerialization.setPreparedStatement(request
0706: .getPreparedStatementParameters(),
0707: (PreparedStatement) s);
0708: }
0709:
0710: // Let the worker thread know which statement we are using in case there is
0711: // a need to cancel that statement during its execution
0712: if (workerThread != null)
0713: workerThread.setCurrentStatement(s);
0714:
0715: DriverCompliance driverCompliance = backend
0716: .getDriverCompliance();
0717: if (driverCompliance.supportSetQueryTimeout())
0718: s.setQueryTimeout(request.getTimeout());
0719:
0720: if (setupResultSetParameters) {
0721: if ((request.getCursorName() != null)
0722: && (driverCompliance.supportSetCursorName()))
0723: s.setCursorName(request.getCursorName());
0724: if ((request.getFetchSize() != 0)
0725: && driverCompliance.supportSetFetchSize())
0726: s.setFetchSize(request.getFetchSize());
0727: if ((request.getMaxRows() > 0)
0728: && driverCompliance.supportSetMaxRows())
0729: s.setMaxRows(request.getMaxRows());
0730: }
0731: s.setEscapeProcessing(request.getEscapeProcessing());
0732: return s;
0733: }
0734:
0735: /**
0736: * Execute a statement on a backend. If the execution fails, the connection is
0737: * checked for validity. If the connection was not valid, the query is
0738: * automatically retried on a new connection.<br>
0739: *
0740: * @param request the request to execute
0741: * @param backend the backend on which the request is executed
0742: * @param workerThread the backend worker thread executing this query (or null
0743: * if none)
0744: * @param c connection used to create the statement
0745: * @param metadataCache MetadataCache (null if none)
0746: * @return the ControllerResultSet
0747: * @throws SQLException if an error occurs
0748: * @throws BadConnectionException if the connection was bad
0749: * @throws UnreachableBackendException if the backend is unreachable
0750: */
0751: public static final ControllerResultSet executeStatementExecuteQueryOnBackend(
0752: SelectRequest request, DatabaseBackend backend,
0753: BackendWorkerThread workerThread, Connection c,
0754: MetadataCache metadataCache) throws SQLException,
0755: BadConnectionException, UnreachableBackendException {
0756: ControllerResultSet rs = null;
0757: ResultSet backendRS = null;
0758: try {
0759: backend.addPendingReadRequest(request);
0760:
0761: Statement s = setupStatementOrPreparedStatement(request,
0762: backend, workerThread, c, true, false);
0763:
0764: // Execute the query
0765: if (request.getPreparedStatementParameters() == null)
0766: backendRS = s.executeQuery(backend.rewriteQuery(request
0767: .getSqlOrTemplate()));
0768: else
0769: backendRS = ((PreparedStatement) s).executeQuery();
0770:
0771: SQLWarning stWarns = null;
0772: if (request.getRetrieveSQLWarnings()) {
0773: stWarns = s.getWarnings();
0774: }
0775: rs = new ControllerResultSet(request, backendRS,
0776: metadataCache, s, false);
0777: rs.setStatementWarnings(stWarns);
0778: } catch (SQLException e) { // Something bad happened
0779: /*
0780: * A persistent connection is a direct tie between the client program and
0781: * the backend database. If the connection is broken, it cannot be retried
0782: * on another connection on the same backend, because the client may be
0783: * depending on state that was associated with the broken connection.
0784: */
0785: if (backend.isValidConnection(c))
0786: throw e; // Connection is valid, throw the exception
0787: else if (request.isPersistentConnection())
0788: throw new UnreachableBackendException(
0789: "Bad persistent connection", e);
0790: else
0791: throw new BadConnectionException(e);
0792: } finally {
0793: // we can close this resultset if fetch size was 0
0794: if (backendRS != null && request.getFetchSize() == 0)
0795: try {
0796: backendRS.close();
0797: } catch (SQLException ignore) {
0798: }
0799: backend.removePendingRequest(request);
0800: }
0801: return rs;
0802: }
0803:
0804: /**
0805: * Execute an update prepared statement on a backend. If the execution fails,
0806: * the connection is checked for validity. If the connection was not valid,
0807: * the query is automatically retried on a new connection.
0808: *
0809: * @param request the request to execute
0810: * @param backend the backend on which the request is executed
0811: * @param workerThread the backend worker thread executing this query (or null
0812: * if none)
0813: * @param pc pooled connection used to create the statement
0814: * @return int Number of rows effected
0815: * @throws SQLException if an error occurs
0816: * @throws BadConnectionException if the connection was bad
0817: */
0818: public static final ExecuteUpdateResult executeStatementExecuteUpdateOnBackend(
0819: AbstractWriteRequest request, DatabaseBackend backend,
0820: BackendWorkerThread workerThread, PooledConnection pc)
0821: throws SQLException, BadConnectionException {
0822: Statement s = null;
0823: Connection c = pc.getConnection();
0824: try {
0825: backend.addPendingWriteRequest(request);
0826:
0827: s = setupStatementOrPreparedStatement(request, backend,
0828: workerThread, c, false, false);
0829:
0830: if (request.requiresConnectionFlush()) {
0831: pc.setMustBeRenewed(true);
0832: }
0833:
0834: // Execute the query
0835: ExecuteUpdateResult eur;
0836: if (request.getPreparedStatementParameters() == null)
0837: eur = new ExecuteUpdateResult(s.executeUpdate(backend
0838: .rewriteQuery(request.getSqlOrTemplate())));
0839: else
0840: eur = new ExecuteUpdateResult(((PreparedStatement) s)
0841: .executeUpdate());
0842: // get warnings, if required
0843: if (request.getRetrieveSQLWarnings())
0844: eur.setStatementWarnings(s.getWarnings());
0845:
0846: if (request.requiresConnectionPoolFlush())
0847: backend.flagAllConnectionsForRenewal();
0848:
0849: if (request instanceof CreateRequest
0850: && ((CreateRequest) request)
0851: .createsTemporaryTable())
0852: pc.addTemporaryTables(request.getTableName());
0853: return eur;
0854: } catch (SQLException e) { // Something bad happened
0855: if (backend.isValidConnection(c))
0856: throw e; // Connection is valid, throw the exception
0857: else
0858: throw new BadConnectionException(e);
0859: } finally {
0860: backend.removePendingRequest(request);
0861: try {
0862: if (s != null)
0863: s.close();
0864: } catch (SQLException ignore) {
0865: }
0866: }
0867: }
0868:
0869: /**
0870: * Execute an update prepared statement on a backend. If the execution fails,
0871: * the connection is checked for validity. If the connection was not valid,
0872: * the query is automatically retried on a new connection.
0873: *
0874: * @param request the request to execute
0875: * @param backend the backend on which the request is executed
0876: * @param workerThread the backend worker thread executing this query (or null
0877: * if none)
0878: * @param pc connection used to create the statement
0879: * @param metadataCache MetadataCache (null if none)
0880: * @return ControllerResultSet containing the auto-generated keys
0881: * @throws SQLException if an error occurs
0882: * @throws BadConnectionException if the connection was bad
0883: */
0884: public static final GeneratedKeysResult executeStatementExecuteUpdateWithKeysOnBackend(
0885: AbstractWriteRequest request, DatabaseBackend backend,
0886: BackendWorkerThread workerThread, PooledConnection pc,
0887: MetadataCache metadataCache) throws SQLException,
0888: BadConnectionException {
0889: if (!backend.getDriverCompliance().supportGetGeneratedKeys())
0890: throw new SQLException("Backend " + backend.getName()
0891: + " does not support RETURN_GENERATED_KEYS");
0892:
0893: Statement s = null;
0894: Connection c = pc.getConnection();
0895: try {
0896: backend.addPendingWriteRequest(request);
0897:
0898: s = setupStatementOrPreparedStatement(request, backend,
0899: workerThread, c, false, true);
0900:
0901: if (request.requiresConnectionFlush()) {
0902: pc.setMustBeRenewed(true);
0903: }
0904: // Execute the query
0905: int updateCount;
0906: if (request.getPreparedStatementParameters() == null)
0907: updateCount = s.executeUpdate(backend
0908: .rewriteQuery(request.getSqlOrTemplate()),
0909: Statement.RETURN_GENERATED_KEYS);
0910: else
0911: updateCount = ((PreparedStatement) s).executeUpdate();
0912: // get warnings, if required
0913: SQLWarning stWarns = null;
0914: if (request.getRetrieveSQLWarnings()) {
0915: stWarns = s.getWarnings();
0916: }
0917: ControllerResultSet rs = new ControllerResultSet(request, s
0918: .getGeneratedKeys(), metadataCache, s, false);
0919: GeneratedKeysResult gkr = new GeneratedKeysResult(rs,
0920: updateCount);
0921: gkr.setStatementWarnings(stWarns);
0922:
0923: if (request.requiresConnectionPoolFlush())
0924: backend.flagAllConnectionsForRenewal();
0925:
0926: return gkr;
0927: } catch (SQLException e) { // Something bad happened
0928: if (backend.isValidConnection(c))
0929: throw e; // Connection is valid, throw the exception
0930: else
0931: throw new BadConnectionException(e);
0932: } finally {
0933: backend.removePendingRequest(request);
0934: try {
0935: if (s != null)
0936: s.close();
0937: } catch (SQLException ignore) {
0938: }
0939: }
0940: }
0941:
0942: /**
0943: * Execute a request that returns multiple results on the given backend. The
0944: * statement is setXXX if the driver has not processed the statement.
0945: *
0946: * @param request the request to execute
0947: * @param backend the backend on which to execute the stored procedure
0948: * @param workerThread the backend worker thread executing this query (or null
0949: * if none)
0950: * @param pc the connection on which to execute the stored procedure
0951: * @param metadataCache the matedatacache to build the ControllerResultSet
0952: * @return an <code>ExecuteResult</code> object
0953: * @throws SQLException if an error occurs
0954: * @throws BadConnectionException if the connection was bad
0955: */
0956: public static final ExecuteResult executeStatementExecuteOnBackend(
0957: AbstractRequest request, DatabaseBackend backend,
0958: BackendWorkerThread workerThread, PooledConnection pc,
0959: MetadataCache metadataCache) throws SQLException,
0960: BadConnectionException {
0961: Statement s = null;
0962: Connection c = pc.getConnection();
0963: try {
0964: backend.addPendingWriteRequest(request);
0965:
0966: // Disable fetch size when using execute()
0967: request.setFetchSize(0);
0968:
0969: s = setupStatementOrPreparedStatement(request, backend,
0970: workerThread, c, true, false);
0971:
0972: if (request.requiresConnectionFlush()) {
0973: pc.setMustBeRenewed(true);
0974: }
0975: // Execute the query
0976: boolean hasResult;
0977: if (request.getPreparedStatementParameters() == null)
0978: hasResult = s.execute(backend.rewriteQuery(request
0979: .getSqlOrTemplate()));
0980: else
0981: hasResult = ((PreparedStatement) s).execute();
0982:
0983: int updatedRows = 0;
0984: // Process the result and get all ResultSets or udpate counts
0985: ExecuteResult result = new ExecuteResult();
0986: // get warnings, if required
0987: if (request.getRetrieveSQLWarnings())
0988: result.setStatementWarnings(s.getWarnings());
0989: int niter = 0;
0990: do {
0991: if (hasResult) {
0992: ControllerResultSet crs = new ControllerResultSet(
0993: request, s.getResultSet(), metadataCache,
0994: null, true);
0995: result.addResult(crs);
0996: } else {
0997: updatedRows = s.getUpdateCount();
0998: result.addResult(updatedRows);
0999: }
1000: hasResult = s.getMoreResults();
1001: niter++;
1002: logUnreasonableNumberOfIterations(niter);
1003: } while (hasResult || (updatedRows != -1));
1004:
1005: if (request.requiresConnectionPoolFlush())
1006: backend.flagAllConnectionsForRenewal();
1007:
1008: return result;
1009: } catch (SQLException e) { // Something bad happened
1010: if (backend.isValidConnection(c))
1011: throw e; // Connection is valid, throw the exception
1012: else
1013: throw new BadConnectionException(e);
1014: } finally {
1015: backend.removePendingRequest(request);
1016: try {
1017: if (s != null)
1018: s.close();
1019: } catch (SQLException ignore) {
1020: }
1021: }
1022: }
1023:
1024: /**
1025: * Fetch Out and Named parameters if any. The information about the parameters
1026: * is found in the StoredProcedure object and the results are stored in the
1027: * same structures.
1028: * <p>
1029: * After calling this method, the stored procedure object does not contain
1030: * anymore the types of the parameters but their returned values.
1031: *
1032: * @param cs callable statement to fetch from
1033: * @param proc stored procedure object with parameters information
1034: * @throws SQLException if an error occurs during fetching
1035: */
1036: private static void fetchOutAndNamedParameters(
1037: CallableStatement cs, StoredProcedure proc)
1038: throws SQLException {
1039: // First fetch the out parameters
1040: List outParamIndexes = proc.getOutParameterIndexes();
1041: if (outParamIndexes != null) {
1042: for (Iterator iter = outParamIndexes.iterator(); iter
1043: .hasNext();) {
1044: Object index = iter.next();
1045: if (index instanceof Integer)
1046: proc.setOutParameterValue(index, cs
1047: .getObject(((Integer) index).intValue()));
1048: else
1049: // Named OUT parameter
1050: proc.setOutParameterValue(index, cs
1051: .getObject((String) index));
1052: }
1053: }
1054:
1055: // Fetch the named parameters
1056: List namedParamNames = proc.getNamedParameterNames();
1057: if (namedParamNames != null) {
1058: for (Iterator iter = namedParamNames.iterator(); iter
1059: .hasNext();) {
1060: // Overwrite the type with the result (re-use the same map)
1061: String paramName = (String) iter.next();
1062: proc.setNamedParameterValue(paramName, cs
1063: .getObject(paramName));
1064: }
1065: }
1066: }
1067:
1068: /**
1069: * Setup a Statement or a PreparedStatement (decoded if a SQL template is
1070: * found in the request).
1071: */
1072: private static CallableStatement setupCallableStatement(
1073: StoredProcedure proc, DatabaseBackend backend,
1074: BackendWorkerThread workerThread, Connection c,
1075: boolean setupResultSetParameters) throws SQLException {
1076: CallableStatement cs; // Can also be used as a PreparedStatement
1077: cs = c.prepareCall(backend
1078: .rewriteQuery(proc.getSqlOrTemplate()));
1079: if (proc.getPreparedStatementParameters() != null)
1080: PreparedStatementSerialization.setCallableStatement(
1081: backend.rewriteQuery(proc
1082: .getPreparedStatementParameters()), cs,
1083: proc);
1084:
1085: // Let the worker thread know which statement we are using in case there is
1086: // a need to cancel that statement during its execution
1087: if (workerThread != null)
1088: workerThread.setCurrentStatement(cs);
1089:
1090: DriverCompliance driverCompliance = backend
1091: .getDriverCompliance();
1092: if (driverCompliance.supportSetQueryTimeout())
1093: cs.setQueryTimeout(proc.getTimeout());
1094:
1095: if (setupResultSetParameters) {
1096: if ((proc.getCursorName() != null)
1097: && (driverCompliance.supportSetCursorName()))
1098: cs.setCursorName(proc.getCursorName());
1099: if ((proc.getFetchSize() != 0)
1100: && driverCompliance.supportSetFetchSize())
1101: cs.setFetchSize(proc.getFetchSize());
1102: if ((proc.getMaxRows() > 0)
1103: && driverCompliance.supportSetMaxRows())
1104: cs.setMaxRows(proc.getMaxRows());
1105: }
1106: cs.setEscapeProcessing(proc.getEscapeProcessing());
1107: return cs;
1108: }
1109:
1110: /**
1111: * Execute a read stored procedure on the given backend. The callable
1112: * statement is setXXX if the driver has not processed the statement.<br>
1113: *
1114: * @param proc the stored procedure to execute
1115: * @param backend the backend on which to execute the stored procedure
1116: * @param workerThread the backend worker thread executing this query (or null
1117: * if none)
1118: * @param c the connection on which to execute the stored procedure
1119: * @param metadataCache the matedatacache to build the ControllerResultSet
1120: * @return the controllerResultSet
1121: * @throws SQLException if an error occurs
1122: * @throws BadConnectionException if the connection was bad
1123: */
1124: public static final ControllerResultSet executeCallableStatementExecuteQueryOnBackend(
1125: StoredProcedure proc, DatabaseBackend backend,
1126: BackendWorkerThread workerThread, Connection c,
1127: MetadataCache metadataCache) throws SQLException,
1128: BadConnectionException {
1129: CallableStatement cs = null;
1130: ResultSet backendRS = null;
1131: try {
1132: backend.addPendingReadRequest(proc);
1133:
1134: cs = setupCallableStatement(proc, backend, workerThread, c,
1135: true);
1136:
1137: // Execute the query
1138: backendRS = cs.executeQuery();
1139:
1140: SQLWarning stWarns = null;
1141: if (proc.getRetrieveSQLWarnings()) {
1142: stWarns = cs.getWarnings();
1143: }
1144: ControllerResultSet rs = new ControllerResultSet(proc,
1145: backendRS, metadataCache, cs, false);
1146: rs.setStatementWarnings(stWarns);
1147: fetchOutAndNamedParameters(cs, proc);
1148:
1149: if (proc.requiresConnectionPoolFlush())
1150: backend.flagAllConnectionsForRenewal();
1151:
1152: return rs;
1153: } catch (SQLException e) { // Something bad happened
1154: if (backend.isValidConnection(c))
1155: throw e; // Connection is valid, throw the exception
1156: else
1157: throw new BadConnectionException(e);
1158: } finally {
1159: // we can close this resultset if fetch size was 0
1160: if (backendRS != null && proc.getFetchSize() == 0) {
1161: try {
1162: backendRS.close();
1163: } catch (SQLException ignore) {
1164: }
1165: }
1166: backend.removePendingRequest(proc);
1167: }
1168: }
1169:
1170: /**
1171: * Execute a write stored procedure on the given backend. The callable
1172: * statement is setXXX if the driver has not processed the statement.
1173: *
1174: * @param proc the stored procedure to execute
1175: * @param backend the backend on which to execute the stored procedure
1176: * @param workerThread the backend worker thread executing this query (or null
1177: * if none)
1178: * @param c the connection on which to execute the stored procedure
1179: * @return the number of updated rows
1180: * @throws SQLException if an error occurs
1181: * @throws BadConnectionException if the connection was bad
1182: */
1183: public static final ExecuteUpdateResult executeCallableStatementExecuteUpdateOnBackend(
1184: StoredProcedure proc, DatabaseBackend backend,
1185: BackendWorkerThread workerThread, PooledConnection pc)
1186: throws SQLException, BadConnectionException {
1187: CallableStatement cs = null;
1188: Connection c = pc.getConnection();
1189: try {
1190: backend.addPendingWriteRequest(proc);
1191:
1192: cs = setupCallableStatement(proc, backend, workerThread, c,
1193: false);
1194:
1195: if (proc.requiresConnectionFlush()) {
1196: pc.setMustBeRenewed(true);
1197: }
1198:
1199: // Execute the query
1200: ExecuteUpdateResult eur = new ExecuteUpdateResult(cs
1201: .executeUpdate());
1202: // get warnings, if required
1203: if (proc.getRetrieveSQLWarnings())
1204: eur.setStatementWarnings(cs.getWarnings());
1205:
1206: fetchOutAndNamedParameters(cs, proc);
1207:
1208: if (proc.requiresConnectionPoolFlush())
1209: backend.flagAllConnectionsForRenewal();
1210:
1211: return eur;
1212: } catch (SQLException e) { // Something bad happened
1213: if (backend.isValidConnection(c))
1214: throw e; // Connection is valid, throw the exception
1215: else
1216: throw new BadConnectionException(e);
1217: } finally {
1218: backend.removePendingRequest(proc);
1219: try {
1220: if (cs != null)
1221: cs.close();
1222: } catch (SQLException ignore) {
1223: }
1224: }
1225: }
1226:
1227: /**
1228: * Execute a stored procedure that returns multiple results on the given
1229: * backend. The callable statement is setXXX if the driver has not processed
1230: * the statement.
1231: *
1232: * @param proc the stored procedure to execute
1233: * @param backend the backend on which to execute the stored procedure
1234: * @param workerThread the backend worker thread executing this query (or null
1235: * if none)
1236: * @param c the connection on which to execute the stored procedure
1237: * @param metadataCache the matedatacache to build the ControllerResultSet
1238: * @return an <code>ExecuteResult</code> object
1239: * @throws SQLException if an error occurs
1240: * @throws BadConnectionException if the connection was bad
1241: */
1242: public static final ExecuteResult executeCallableStatementExecuteOnBackend(
1243: StoredProcedure proc, DatabaseBackend backend,
1244: BackendWorkerThread workerThread, PooledConnection pc,
1245: MetadataCache metadataCache) throws SQLException,
1246: BadConnectionException {
1247: CallableStatement cs = null;
1248: Connection c = pc.getConnection();
1249: try {
1250: backend.addPendingWriteRequest(proc);
1251:
1252: // Disable fetch size when using execute()
1253: proc.setFetchSize(0);
1254:
1255: cs = setupCallableStatement(proc, backend, workerThread, c,
1256: true);
1257:
1258: if (proc.requiresConnectionFlush()) {
1259: pc.setMustBeRenewed(true);
1260: }
1261:
1262: // Execute the query
1263: boolean hasResult = cs.execute();
1264: int updatedRows = 0;
1265: // Process the result and get all ResultSets or udpate counts
1266: ExecuteResult result = new ExecuteResult();
1267: // get warnings, if required
1268: if (proc.getRetrieveSQLWarnings())
1269: result.setStatementWarnings(cs.getWarnings());
1270: int niter = 0;
1271: do {
1272: if (hasResult) {
1273: ControllerResultSet crs = new ControllerResultSet(
1274: proc, cs.getResultSet(), metadataCache,
1275: null, true);
1276: result.addResult(crs);
1277: } else {
1278: updatedRows = cs.getUpdateCount();
1279: result.addResult(updatedRows);
1280: }
1281: if (updatedRows != -1)
1282: hasResult = cs.getMoreResults();
1283:
1284: niter++;
1285: logUnreasonableNumberOfIterations(niter);
1286: } while (hasResult || (updatedRows != -1));
1287:
1288: fetchOutAndNamedParameters(cs, proc);
1289:
1290: if (proc.requiresConnectionPoolFlush())
1291: backend.flagAllConnectionsForRenewal();
1292:
1293: return result;
1294: } catch (SQLException e) { // Something bad happened
1295: if (backend.isValidConnection(c))
1296: throw e; // Connection is valid, throw the exception
1297: else
1298: throw new BadConnectionException(e);
1299: } finally {
1300: backend.removePendingRequest(proc);
1301: try {
1302: if (cs != null)
1303: cs.close();
1304: } catch (SQLException ignore) {
1305: }
1306: }
1307: }
1308:
1309: /**
1310: * Sanity check to log buggy JDBC drivers triggering infinite loops. Logs at
1311: * regular intervals. Warning: does log on zero, so please start at 1!
1312: *
1313: * @param niter number to check
1314: */
1315: private static void logUnreasonableNumberOfIterations(int niter) {
1316: if (niter % 1024 != 0)
1317: return;
1318:
1319: // The time has come...
1320: Throwable t = new Throwable(); // get a stacktrace
1321: logger.warn(niter + " getMoreResults() iterations", t);
1322: }
1323:
1324: /**
1325: * Get PreparedStatement metadata before the statement is executed.
1326: *
1327: * @param sqlTemplate the PreparedStatement sql template
1328: * @param backend the backend on which we execute the request
1329: * @param c the connection to create the statement from
1330: * @return an empty ResultSet with the associated metadata
1331: * @throws SQLException if an error occurs
1332: * @throws BadConnectionException if the databsae connection was bad
1333: */
1334: public static final ControllerResultSet preparedStatementGetMetaDataOnBackend(
1335: String sqlTemplate, DatabaseBackend backend, Connection c)
1336: throws SQLException, BadConnectionException {
1337: try {
1338: PreparedStatement ps = c.prepareStatement(sqlTemplate);
1339: return new ControllerResultSet(
1340: ControllerConstants.CONTROLLER_FACTORY
1341: .getResultSetMetaDataFactory()
1342: .copyResultSetMetaData(ps.getMetaData(),
1343: null), new ArrayList());
1344: } catch (SQLException e) { // Something bad happened
1345: if (backend.isValidConnection(c))
1346: throw e; // Connection is valid, throw the exception
1347: else
1348: throw new BadConnectionException(e);
1349: }
1350: }
1351:
1352: //
1353: // Transaction management
1354: //
1355:
1356: /**
1357: * Abort a transaction and all its currently pending or executing queries.
1358: *
1359: * @param tm The transaction marker metadata
1360: * @throws SQLException if an error occurs
1361: */
1362: public abstract void abort(TransactionMetaData tm)
1363: throws SQLException;
1364:
1365: /**
1366: * Begin a new transaction.
1367: *
1368: * @param tm The transaction marker metadata
1369: * @throws SQLException if an error occurs
1370: */
1371: public abstract void begin(TransactionMetaData tm)
1372: throws SQLException;
1373:
1374: /**
1375: * Commit a transaction.
1376: *
1377: * @param tm The transaction marker metadata
1378: * @throws AllBackendsFailedException if all backends failed to execute the
1379: * request
1380: * @throws SQLException if an error occurs
1381: */
1382: public abstract void commit(TransactionMetaData tm)
1383: throws AllBackendsFailedException, SQLException;
1384:
1385: /**
1386: * Rollback a transaction.
1387: *
1388: * @param tm The transaction marker metadata
1389: * @throws AllBackendsFailedException if all backends failed to execute the
1390: * request
1391: * @throws SQLException if an error occurs
1392: */
1393: public abstract void rollback(TransactionMetaData tm)
1394: throws AllBackendsFailedException, SQLException;
1395:
1396: /**
1397: * Rollback a transaction to a savepoint
1398: *
1399: * @param tm The transaction marker metadata
1400: * @param savepointName The name of the savepoint
1401: * @throws AllBackendsFailedException if all backends failed to execute the
1402: * request
1403: * @throws SQLException if an error occurs
1404: */
1405: public abstract void rollbackToSavepoint(TransactionMetaData tm,
1406: String savepointName) throws AllBackendsFailedException,
1407: SQLException;
1408:
1409: /**
1410: * Set a savepoint to a transaction.
1411: *
1412: * @param tm The transaction marker metadata
1413: * @param name The name of the new savepoint
1414: * @throws AllBackendsFailedException if all backends failed to execute the
1415: * request
1416: * @throws SQLException if an error occurs
1417: */
1418: public abstract void setSavepoint(TransactionMetaData tm,
1419: String name) throws AllBackendsFailedException,
1420: SQLException;
1421:
1422: /**
1423: * Release a savepoint from a transaction
1424: *
1425: * @param tm The transaction marker metadata
1426: * @param name The name of the savepoint ro release
1427: * @throws AllBackendsFailedException if all backends failed to execute the
1428: * request
1429: * @throws SQLException if an error occurs
1430: */
1431: public abstract void releaseSavepoint(TransactionMetaData tm,
1432: String name) throws AllBackendsFailedException,
1433: SQLException;
1434:
1435: /**
1436: * Close a persistent connection.
1437: *
1438: * @param login login requesting the connection closing
1439: * @param persistentConnectionId id of the persistent connection to close
1440: * @throws SQLException if an error occurs
1441: */
1442: public abstract void closePersistentConnection(String login,
1443: long persistentConnectionId) throws SQLException;
1444:
1445: /**
1446: * Open a persistent connection.
1447: *
1448: * @param login login requesting the connection closing
1449: * @param persistentConnectionId id of the persistent connection to open
1450: * @throws SQLException if an error occurs
1451: */
1452: public abstract void openPersistentConnection(String login,
1453: long persistentConnectionId) throws SQLException;
1454:
1455: /**
1456: * Factorized code to start a transaction on a backend and to retrieve a
1457: * connection on this backend
1458: *
1459: * @param backend the backend needed to check valid connection against this
1460: * backend test statement
1461: * @param cm the connection manager to use to retrieve connections
1462: * @param request request that will execute (must carry transaction id and
1463: * transaction isolation level (does nothing if equals to
1464: * Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL))
1465: * @return a valid connection with a started transaction
1466: * @throws SQLException if the backend is valid but set autocommit cannot be
1467: * set to false
1468: * @throws UnreachableBackendException if the backend is not reachable, ie not
1469: * valid connection can be retrieved
1470: * @see org.continuent.sequoia.driver.Connection#DEFAULT_TRANSACTION_ISOLATION_LEVEL
1471: */
1472: public static final Connection getConnectionAndBeginTransaction(
1473: DatabaseBackend backend, AbstractConnectionManager cm,
1474: AbstractRequest request) throws SQLException,
1475: UnreachableBackendException {
1476: PooledConnection pc = null;
1477: boolean isConnectionValid = false;
1478: Connection c;
1479:
1480: do {
1481: if (request.isPersistentConnection()) { // Retrieve the persistent connection and register it for the
1482: // transaction
1483: pc = cm.retrieveConnectionInAutoCommit(request);
1484: cm.registerConnectionForTransaction(pc, request
1485: .getTransactionId());
1486: } else { // Get a new connection for the transaction
1487: pc = cm.getConnectionForTransaction(request
1488: .getTransactionId());
1489: }
1490:
1491: // Sanity check
1492: if (pc == null) {
1493: throw new UnreachableBackendException(Translate.get(
1494: "loadbalancer.unable.get.connection",
1495: new String[] {
1496: String.valueOf(request
1497: .getTransactionId()),
1498: backend.getName() }));
1499: }
1500: c = pc.getConnection();
1501: try {
1502: if (request.getTransactionIsolation() != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) {
1503: /*
1504: * A user specified transaction isolation will prevail on any other
1505: * settings
1506: */
1507: pc.setTransactionIsolation(request
1508: .getTransactionIsolation());
1509: } else if (defaultTransactionIsolationLevel != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) {
1510: /*
1511: * The defaultTransactionIsolationLevel can be enforced in the
1512: * configuration file to force all transactions to use this level of
1513: * isolation
1514: */
1515: pc
1516: .setTransactionIsolation(defaultTransactionIsolationLevel);
1517: }
1518:
1519: c.setAutoCommit(false);
1520: isConnectionValid = true;
1521: } catch (SQLException e) {
1522: if (backend.isValidConnection(c))
1523: throw e; // Connection is valid, throw the exception
1524: else {
1525: cm.deleteConnection(request.getTransactionId());
1526: if (request.isPersistentConnection()) {
1527: cm.deletePersistentConnection(request
1528: .getPersistentConnectionId());
1529: }
1530: }
1531: }
1532: } while (!isConnectionValid);
1533: if (pc == null) {
1534: if (logger.isErrorEnabled()) {
1535: logger.error("Got a null connection [backend="
1536: + backend.getName() + ", tid="
1537: + request.getTransactionId() + "]");
1538: }
1539: }
1540: return c;
1541: }
1542:
1543: //
1544: // Backends management
1545: //
1546:
1547: /**
1548: * Enable a backend without further check. The backend is at least read
1549: * enabled but could also be enabled for writes. Ask the corresponding
1550: * connection manager to initialize the connections if needed.
1551: *
1552: * @param db The database backend to enable
1553: * @param writeEnabled True if the backend must be enabled for writes
1554: * @throws SQLException if an error occurs
1555: */
1556: public abstract void enableBackend(DatabaseBackend db,
1557: boolean writeEnabled) throws SQLException;
1558:
1559: /**
1560: * Disable a backend without further check. Ask the corresponding connection
1561: * manager to finalize the connections if needed. This method should not be
1562: * called directly but instead should access the
1563: * <code>RequestManager.disableBackend(...)</code> method.
1564: *
1565: * @param db The database backend to disable
1566: * @param forceDisable true if disable must be forced
1567: * @throws SQLException if an error occurs
1568: */
1569: public abstract void disableBackend(DatabaseBackend db,
1570: boolean forceDisable) throws SQLException;
1571:
1572: /**
1573: * Get the number of currently enabled backends. 0 means that no backend is
1574: * available.
1575: *
1576: * @return number of currently enabled backends
1577: */
1578: public int getNumberOfEnabledBackends() {
1579: return enabledBackends.size();
1580: }
1581:
1582: /**
1583: * Get information about the Request Load Balancer
1584: *
1585: * @return <code>String</code> containing information
1586: */
1587: public abstract String getInformation();
1588:
1589: /**
1590: * Get information about the Request Load Balancer in xml
1591: *
1592: * @return <code>String</code> containing information, xml formatted
1593: */
1594: public abstract String getXmlImpl();
1595:
1596: /**
1597: * @see org.continuent.sequoia.common.xml.XmlComponent#getXml()
1598: */
1599: public String getXml() {
1600: StringBuffer info = new StringBuffer();
1601: info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + " "
1602: + DatabasesXmlTags.ATT_transactionIsolation + "=\"");
1603: switch (defaultTransactionIsolationLevel) {
1604: case Connection.TRANSACTION_READ_UNCOMMITTED:
1605: info.append(DatabasesXmlTags.VAL_readUncommitted);
1606: break;
1607: case Connection.TRANSACTION_READ_COMMITTED:
1608: info.append(DatabasesXmlTags.VAL_readCommitted);
1609: break;
1610: case Connection.TRANSACTION_REPEATABLE_READ:
1611: info.append(DatabasesXmlTags.VAL_repeatableRead);
1612: break;
1613: case Connection.TRANSACTION_SERIALIZABLE:
1614: info.append(DatabasesXmlTags.VAL_serializable);
1615: break;
1616: default:
1617: info.append(DatabasesXmlTags.VAL_databaseDefault);
1618: break;
1619: }
1620: info.append("\">");
1621: info.append(getXmlImpl());
1622: info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">");
1623: return info.toString();
1624: }
1625: }
|