0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Julie Marguerite, Greg Ward, Nicolas Modrzyk, Vadim Kassin,
0023: * Jean-Bernard van Zuylen, Peter Royal.
0024: */package org.continuent.sequoia.controller.requestmanager;
0025:
0026: import java.sql.SQLException;
0027: import java.util.ArrayList;
0028: import java.util.Date;
0029: import java.util.Hashtable;
0030: import java.util.Iterator;
0031: import java.util.LinkedList;
0032: import java.util.List;
0033: import java.util.SortedSet;
0034: import java.util.Vector;
0035:
0036: import javax.management.NotCompliantMBeanException;
0037:
0038: import org.continuent.sequoia.common.exceptions.BackupException;
0039: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0040: import org.continuent.sequoia.common.exceptions.RollbackException;
0041: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0042: import org.continuent.sequoia.common.i18n.Translate;
0043: import org.continuent.sequoia.common.jmx.JmxConstants;
0044: import org.continuent.sequoia.common.jmx.management.BackendInfo;
0045: import org.continuent.sequoia.common.jmx.management.BackendState;
0046: import org.continuent.sequoia.common.jmx.management.DumpInfo;
0047: import org.continuent.sequoia.common.jmx.mbeans.RequestManagerMBean;
0048: import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
0049: import org.continuent.sequoia.common.log.Trace;
0050: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0051: import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0052: import org.continuent.sequoia.common.sql.schema.DatabaseTable;
0053: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0054: import org.continuent.sequoia.common.xml.XmlComponent;
0055: import org.continuent.sequoia.controller.backend.BackendStateListener;
0056: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0057: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0058: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0059: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0060: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0061: import org.continuent.sequoia.controller.backup.BackupManager;
0062: import org.continuent.sequoia.controller.backup.Backuper;
0063: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
0064: import org.continuent.sequoia.controller.cache.parsing.ParsingCache;
0065: import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
0066: import org.continuent.sequoia.controller.cache.result.entries.AbstractResultCacheEntry;
0067: import org.continuent.sequoia.controller.core.ControllerConstants;
0068: import org.continuent.sequoia.controller.jmx.AbstractStandardMBean;
0069: import org.continuent.sequoia.controller.jmx.MBeanServerManager;
0070: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0071: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
0072: import org.continuent.sequoia.controller.loadbalancer.LoadBalancerControl;
0073: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
0074: import org.continuent.sequoia.controller.recoverylog.BackendRecoveryInfo;
0075: import org.continuent.sequoia.controller.recoverylog.RecoverThread;
0076: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0077: import org.continuent.sequoia.controller.requests.AbstractRequest;
0078: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0079: import org.continuent.sequoia.controller.requests.AlterRequest;
0080: import org.continuent.sequoia.controller.requests.CreateRequest;
0081: import org.continuent.sequoia.controller.requests.DropRequest;
0082: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0083: import org.continuent.sequoia.controller.requests.RequestFactory;
0084: import org.continuent.sequoia.controller.requests.RequestType;
0085: import org.continuent.sequoia.controller.requests.SelectRequest;
0086: import org.continuent.sequoia.controller.requests.StoredProcedure;
0087: import org.continuent.sequoia.controller.requests.UpdateRequest;
0088: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0089: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0090: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread;
0091: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
0092: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
0093: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
0094: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
0095: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
0096: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
0097:
0098: /**
0099: * This class defines the Request Manager.
0100: * <p>
0101: * The RM is composed of a Request Scheduler, an optional Query Cache, and a
0102: * Load Balancer and an optional Recovery Log.
0103: *
0104: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0105: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
0106: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
0107: * @author <a href="mailto:vadim@kase.kz">Vadim Kassin </a>
0108: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0109: * </a>
0110: * @version 1.0
0111: */
0112: public class RequestManager extends AbstractStandardMBean implements
0113: XmlComponent, RequestManagerMBean {
0114:
0115: //
0116: // How the code is organized ?
0117: //
0118: // 1. Member variables
0119: // 2. Constructor(s)
0120: // 3. Request handling
0121: // 4. Transaction handling
0122: // 5. Database backend management
0123: // 6. Getter/Setter (possibly in alphabetical order)
0124: //
0125:
0126: /** begin timeout in ms */
0127: protected long beginTimeout;
0128:
0129: /** commit timeout in ms */
0130: protected long commitTimeout;
0131:
0132: /** rollback timeout in ms */
0133: protected long rollbackTimeout;
0134:
0135: /** The virtual database owning this Request Manager */
0136: protected VirtualDatabase vdb;
0137:
0138: /** The request scheduler to order and schedule requests */
0139: protected AbstractScheduler scheduler;
0140:
0141: /** An optional request cache to cache responses to SQL requests */
0142: protected AbstractResultCache resultCache;
0143:
0144: /** The request load balancer to use to send requests to the databases */
0145: protected AbstractLoadBalancer loadBalancer;
0146:
0147: /** An optional recovery log */
0148: protected RecoveryLog recoveryLog;
0149:
0150: /** The backup manager responsible for backup and restore of backends */
0151: protected BackupManager backupManager;
0152:
0153: // The virtual dabase schema
0154: protected DatabaseSchema dbs;
0155:
0156: /** <code>true</code> if schema is no more up-to-date and needs a refresh */
0157: private boolean schemaIsDirty = false;
0158:
0159: private boolean schemaIsStatic = false;
0160:
0161: private boolean isCaseSensitiveParsing = false;
0162:
0163: protected ParsingCache parsingCache = null;
0164:
0165: private MetadataCache metadataCache = null;
0166:
0167: // SQL queries parsing granularity according to Scheduler, ResultCache and
0168: // LoadBalancer required granularity
0169: protected int schedulerParsingranularity = ParsingGranularities.NO_PARSING;
0170:
0171: private int cacheParsingranularity = ParsingGranularities.NO_PARSING;
0172:
0173: private int loadBalancerParsingranularity = ParsingGranularities.NO_PARSING;
0174:
0175: protected int requiredParsingGranularity = ParsingGranularities.NO_PARSING;
0176:
0177: private long requestId = 0;
0178:
0179: /* end user logger */
0180: protected static Trace endUserLogger = Trace
0181: .getLogger("org.continuent.sequoia.enduser");
0182:
0183: /**
0184: * Hashtable<Long, TransactionMetaData>, the <code>Long</code> key is
0185: * the same transaction ID as the <code>transactionId</code> field of its
0186: * corresponding <code>TransactionMetaData</code> value.
0187: */
0188: // <pedantic>This Hashtable should be called transactionMetaData and the
0189: // TransactionMetaData class should be renamed TransactionMetaDatum but
0190: // 1/ I don't care
0191: // 2/ I don't spell check my code :-)
0192: // Jeff.
0193: // </pedantic>
0194: protected Hashtable transactionMetaDatas;
0195:
0196: /**
0197: * Hashtable<Long, String>, the <code>Long</code> being a transaction
0198: * ID and its corresponding <code>String</code> being the name of a
0199: * savepoint.
0200: */
0201: protected Hashtable tidSavepoints;
0202:
0203: protected Trace logger = null;
0204:
0205: private BackendStateListener backendStateListener;
0206:
0207: //
0208: // Constructors
0209: //
0210:
0211: /**
0212: * Creates a new <code>RequestManager</code> instance.
0213: *
0214: * @param vdb the virtual database this request manager belongs to
0215: * @param scheduler the Request Scheduler to use
0216: * @param cache a Query Cache implementation
0217: * @param loadBalancer the Request Load Balancer to use
0218: * @param recoveryLog the Log Recovery to use
0219: * @param beginTimeout timeout in seconds for begin
0220: * @param commitTimeout timeout in seconds for commit
0221: * @param rollbackTimeout timeout in seconds for rollback
0222: * @throws SQLException if an error occurs
0223: * @throws NotCompliantMBeanException if the MBean is not JMX compliant
0224: */
0225: public RequestManager(VirtualDatabase vdb,
0226: AbstractScheduler scheduler, AbstractResultCache cache,
0227: AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
0228: long beginTimeout, long commitTimeout, long rollbackTimeout)
0229: throws SQLException, NotCompliantMBeanException {
0230: super (RequestManagerMBean.class);
0231: this .vdb = vdb;
0232: assignAndCheckSchedulerLoadBalancerValidity(scheduler,
0233: loadBalancer);
0234: // requiredParsingGranularity is the maximum of each component granularity
0235: this .resultCache = cache;
0236: if (resultCache != null) {
0237: cacheParsingranularity = cache.getParsingGranularity();
0238: if (cacheParsingranularity > requiredParsingGranularity)
0239: requiredParsingGranularity = cacheParsingranularity;
0240: }
0241: setRecoveryLog(recoveryLog);
0242: initRequestManagerVariables(vdb, beginTimeout, commitTimeout,
0243: rollbackTimeout);
0244: logger.info(Translate.get("requestmanager.parsing.granularity",
0245: ParsingGranularities
0246: .getInformation(requiredParsingGranularity)));
0247:
0248: if (MBeanServerManager.isJmxEnabled()) {
0249: try {
0250: MBeanServerManager.registerMBean(this , JmxConstants
0251: .getRequestManagerObjectName(vdb
0252: .getVirtualDatabaseName()));
0253:
0254: } catch (Exception e) {
0255: logger
0256: .error(Translate
0257: .get("jmx.failed.register.mbean.requestmanager"));
0258: }
0259: }
0260: }
0261:
0262: /**
0263: * Retrieve the last known checkpoint from the recovery log and set it for
0264: * each backend.
0265: */
0266: public void initBackendsLastKnownCheckpointFromRecoveryLog() {
0267: if (recoveryLog == null)
0268: return;
0269: String databaseName = vdb.getVirtualDatabaseName();
0270: ArrayList backends = vdb.getBackends();
0271: int size = backends.size();
0272: DatabaseBackend backend;
0273: BackendRecoveryInfo info;
0274: for (int i = 0; i < size; i++) {
0275: backend = (DatabaseBackend) backends.get(i);
0276: try {
0277: info = recoveryLog.getBackendRecoveryInfo(databaseName,
0278: backend.getName());
0279: String checkpoint = info.getCheckpoint();
0280: if ((checkpoint == null) || ("".equals(checkpoint))) { // No last known checkpoint
0281: if (info.getBackendState() != BackendState.UNKNOWN) {
0282: String msg = "Backend "
0283: + backend.getName()
0284: + " was not properly stopped, manual recovery will be needed ("
0285: + info + ")";
0286: logger.warn(msg);
0287: throw new SQLException(msg);
0288: }
0289: }
0290: backend.setLastKnownCheckpoint(checkpoint);
0291: } catch (SQLException e) {
0292: if (logger.isDebugEnabled()) {
0293: logger.debug(e.getMessage(), e);
0294: }
0295: backend.setState(BackendState.UNKNOWN);
0296: // Also sets backend.setLastKnownCheckpoint(null);
0297: }
0298: }
0299: }
0300:
0301: /**
0302: * Check that Scheduler and Load Balancer are not null and have compatible
0303: * RAIDb levels.
0304: *
0305: * @param aScheduler the scheduler to set
0306: * @param aLoadBalancer the load balancer to set
0307: * @throws SQLException if an error occurs
0308: */
0309: private void assignAndCheckSchedulerLoadBalancerValidity(
0310: AbstractScheduler aScheduler,
0311: AbstractLoadBalancer aLoadBalancer) throws SQLException {
0312: if (aScheduler == null)
0313: throw new SQLException(Translate
0314: .get("requestmanager.null.scheduler"));
0315:
0316: if (aLoadBalancer == null)
0317: throw new SQLException(Translate
0318: .get("requestmanager.null.loadbalancer"));
0319:
0320: if (aScheduler.getRAIDbLevel() != aLoadBalancer.getRAIDbLevel())
0321: throw new SQLException(Translate.get(
0322: "requestmanager.incompatible.raidb.levels",
0323: new String[] { "" + aScheduler.getRAIDbLevel(),
0324: "" + aLoadBalancer.getRAIDbLevel() }));
0325:
0326: // requiredParsingGranularity is the maximum of each component granularity
0327: setScheduler(aScheduler);
0328: schedulerParsingranularity = aScheduler.getParsingGranularity();
0329: requiredParsingGranularity = schedulerParsingranularity;
0330: setLoadBalancer(aLoadBalancer);
0331: loadBalancerParsingranularity = aLoadBalancer
0332: .getParsingGranularity();
0333: if (loadBalancerParsingranularity > requiredParsingGranularity)
0334: requiredParsingGranularity = loadBalancerParsingranularity;
0335: }
0336:
0337: /**
0338: * Get the parsing from the parsing cache or parse the query.
0339: *
0340: * @param request the request to be parsed (request will be parsed after this
0341: * call)
0342: * @throws SQLException if an error occurs during parsing
0343: */
0344: public void getParsingFromCacheOrParse(AbstractRequest request)
0345: throws SQLException {
0346: /*
0347: * If we need to parse the request, try to get the parsing from the cache.
0348: * Note that if we have a cache miss but backgroundParsing has been turned
0349: * on, then this call will start a ParsedThread in background.
0350: */
0351: if ((requiredParsingGranularity != ParsingGranularities.NO_PARSING)
0352: && (!request.isParsed())) {
0353: if (parsingCache == null)
0354: request.parse(getDatabaseSchema(),
0355: requiredParsingGranularity,
0356: isCaseSensitiveParsing);
0357: else
0358: parsingCache
0359: .getParsingFromCacheAndParseIfMissing(request);
0360: }
0361: }
0362:
0363: /**
0364: * Method initRequestManagerVariables.
0365: *
0366: * @param vdb the virtual database
0367: * @param beginTimeout timeout for begin operation
0368: * @param commitTimeout timeout for commit operation
0369: * @param rollbackTimeout timeout for rollback operation
0370: */
0371: private void initRequestManagerVariables(VirtualDatabase vdb,
0372: long beginTimeout, long commitTimeout, long rollbackTimeout) {
0373: this .transactionMetaDatas = new Hashtable();
0374: this .tidSavepoints = new Hashtable();
0375: this .beginTimeout = beginTimeout;
0376: this .commitTimeout = commitTimeout;
0377: this .rollbackTimeout = rollbackTimeout;
0378: this .vdb = vdb;
0379: logger = Trace
0380: .getLogger("org.continuent.sequoia.controller.RequestManager."
0381: + vdb.getDatabaseName());
0382: }
0383:
0384: //
0385: // Request Handling
0386: //
0387:
0388: /**
0389: * Close the given persistent connection.
0390: *
0391: * @param login login to use to retrieve the right connection pool
0392: * @param persistentConnectionId id of the persistent connection to close
0393: */
0394: public void closePersistentConnection(String login,
0395: long persistentConnectionId) {
0396: VirtualDatabaseWorkerThread vdbwt = vdb
0397: .getVirtualDatabaseWorkerThreadForPersistentConnection(persistentConnectionId);
0398: if (vdbwt != null)
0399: vdbwt.notifyClose(persistentConnectionId);
0400:
0401: try {
0402: scheduler.scheduleClosePersistentConnection();
0403:
0404: // No need to wait for task completion (cannot fail), so log right away.
0405: if (recoveryLog != null)
0406: recoveryLog.logClosePersistentConnection(login,
0407: persistentConnectionId);
0408:
0409: loadBalancer.closePersistentConnection(login,
0410: persistentConnectionId);
0411: } catch (NoMoreBackendException ignore) {
0412: // Removes ugly trace as reported in SEQUOIA-390
0413: } catch (SQLException e) {
0414: logger.warn("Failed to close persistent connection "
0415: + persistentConnectionId, e);
0416: } finally {
0417: scheduler
0418: .closePersistentConnectionCompleted(persistentConnectionId);
0419: }
0420: }
0421:
0422: /**
0423: * Returns true if the virtual database has opened the given persistent
0424: * connection.
0425: *
0426: * @param persistentConnectionId id of the persistent connection to check
0427: * @return true if the connection is open
0428: */
0429: public boolean hasPersistentConnection(long persistentConnectionId) {
0430: return scheduler
0431: .hasPersistentConnection(persistentConnectionId);
0432: }
0433:
0434: /**
0435: * Open the given persistent connection.
0436: *
0437: * @param login login to use to retrieve the right connection pool
0438: * @param persistentConnectionId id of the persistent connection to open
0439: * @param dmsg message in the total order queue if applicable (null otherwise)
0440: * @throws SQLException An exception can be thrown when it failed to open a
0441: * connection
0442: */
0443: public void openPersistentConnection(String login,
0444: long persistentConnectionId,
0445: DistributedOpenPersistentConnection dmsg)
0446: throws SQLException {
0447: boolean success = false;
0448: long entryId = -1;
0449: try {
0450: scheduler.scheduleOpenPersistentConnection(dmsg);
0451:
0452: if (recoveryLog != null)
0453: entryId = recoveryLog.logOpenPersistentConnection(
0454: login, persistentConnectionId);
0455:
0456: loadBalancer.openPersistentConnection(login,
0457: persistentConnectionId);
0458: success = true;
0459: } catch (NoMoreBackendException e) {
0460: throw e;
0461: } catch (SQLException e) {
0462: logger.warn("Failed to open persistent connection "
0463: + persistentConnectionId, e);
0464: throw e;
0465: } finally {
0466: if (recoveryLog != null)
0467: recoveryLog.logRequestCompletion(entryId, success, 0);
0468: scheduler.openPersistentConnectionCompleted(
0469: persistentConnectionId, success);
0470: }
0471: }
0472:
0473: private final Object REQUEST_ID_SYNC_OBJECT = new Object();
0474:
0475: /**
0476: * Initialize the request id with the given value (usually retrieved from the
0477: * recovery log).
0478: *
0479: * @param requestId new current request identifier
0480: */
0481: public void initializeRequestId(long requestId) {
0482: synchronized (REQUEST_ID_SYNC_OBJECT) {
0483: // Use the max operator as a safeguard: IDs may have been delivered but
0484: // not logged yet.
0485: this .requestId = Math.max(this .requestId + 1, requestId);
0486: }
0487: }
0488:
0489: /**
0490: * Return the next request identifier (monotically increasing number).
0491: *
0492: * @return a request identifier
0493: */
0494: public long getNextRequestId() {
0495: synchronized (REQUEST_ID_SYNC_OBJECT) {
0496: return requestId++;
0497: }
0498: }
0499:
0500: /**
0501: * Perform a read request and return the reply. Call first the scheduler, then
0502: * the cache (if defined) and finally the load balancer.
0503: *
0504: * @param request the request to execute
0505: * @return a <code>ControllerResultSet</code> value
0506: * @exception SQLException if an error occurs
0507: */
0508: public ControllerResultSet statementExecuteQuery(
0509: SelectRequest request) throws SQLException {
0510: // Sanity check
0511: TransactionMetaData tm = null;
0512: if (!request.isAutoCommit()) { // Check that the transaction has been started
0513: Long tid = new Long(request.getTransactionId());
0514: try {
0515: tm = getTransactionMetaData(tid);
0516: } catch (SQLException e) {
0517: throw new SQLException(Translate.get(
0518: "transaction.not.started", tid));
0519: }
0520: }
0521:
0522: getParsingFromCacheOrParse(request);
0523:
0524: //
0525: // SCHEDULER
0526: //
0527:
0528: if (logger.isDebugEnabled())
0529: logger.debug(Translate.get(
0530: "requestmanager.read.request.schedule",
0531: new String[] {
0532: String.valueOf(request.getId()),
0533: request.getSqlShortForm(vdb
0534: .getSqlShortFormLength()) }));
0535:
0536: // Wait for the scheduler to give us the authorization to execute
0537: scheduler.scheduleReadRequest(request);
0538:
0539: //
0540: // CACHE
0541: //
0542:
0543: ControllerResultSet result = null;
0544: try { // Check cache if any
0545: if ((resultCache != null) && (!request.isMustBroadcast())) {
0546: if (logger.isDebugEnabled())
0547: logger
0548: .debug(Translate
0549: .get(
0550: "requestmanager.read.request.cache.get",
0551: new String[] {
0552: String
0553: .valueOf(request
0554: .getId()),
0555: request
0556: .getSqlShortForm(vdb
0557: .getSqlShortFormLength()) }));
0558:
0559: AbstractResultCacheEntry qce = resultCache
0560: .getFromCache(request, true);
0561: if (qce != null) {
0562: result = qce.getResult();
0563: if (result != null) { // Cache hit !
0564: if (vdb.getSQLMonitor() != null)
0565: vdb.getSQLMonitor().logCacheHit(request);
0566:
0567: scheduler.readCompleted(request);
0568: return result;
0569: }
0570: }
0571: }
0572:
0573: //
0574: // LOAD BALANCER
0575: //
0576:
0577: if (logger.isDebugEnabled())
0578: logger.debug(Translate.get(
0579: "requestmanager.read.request.balance",
0580: new String[] {
0581: String.valueOf(request.getId()),
0582: request.getSqlShortForm(vdb
0583: .getSqlShortFormLength()) }));
0584:
0585: // At this point, we have a result cache miss.
0586:
0587: // Send the request to the load balancer
0588: result = loadBalancer.statementExecuteQuery(request,
0589: metadataCache);
0590:
0591: //
0592: // UPDATES & NOTIFICATIONS
0593: //
0594:
0595: // Update cache
0596: if ((resultCache != null)
0597: && (request.getCacheAbility() != RequestType.UNCACHEABLE)) {
0598: if (logger.isDebugEnabled())
0599: logger
0600: .debug(Translate
0601: .get(
0602: "requestmanager.read.request.cache.update",
0603: new String[] {
0604: String
0605: .valueOf(request
0606: .getId()),
0607: request
0608: .getSqlShortForm(vdb
0609: .getSqlShortFormLength()) }));
0610:
0611: resultCache.addToCache(request, result);
0612: if (tm != null)
0613: tm.setAltersQueryResultCache(true);
0614: }
0615: } catch (Exception failed) {
0616: if (resultCache != null)
0617: resultCache.removeFromPendingQueries(request);
0618: if (failed instanceof NoMoreBackendException)
0619: throw (NoMoreBackendException) failed;
0620: String msg = Translate.get("requestmanager.request.failed",
0621: new String[] {
0622: request.getSqlShortForm(vdb
0623: .getSqlShortFormLength()),
0624: failed.getMessage() });
0625: if (failed instanceof RuntimeException)
0626: logger.warn(msg, failed);
0627: else
0628: logger.warn(msg);
0629: if (failed instanceof SQLException)
0630: throw (SQLException) failed;
0631:
0632: throw new SQLException(msg);
0633: } finally {
0634: // Notify scheduler of completion
0635: scheduler.readCompleted(request);
0636: }
0637: return result;
0638: }
0639:
0640: /**
0641: * Perform a write request and return the number of rows affected Call first
0642: * the scheduler (if defined), then notify the cache (if defined) and finally
0643: * call the load balancer.
0644: *
0645: * @param request the request to execute
0646: * @return number of rows affected
0647: * @exception SQLException if an error occurs
0648: */
0649: public ExecuteUpdateResult statementExecuteUpdate(
0650: AbstractWriteRequest request) throws SQLException {
0651: boolean hasBeenScheduled = false, schedulerHasBeenNotified = false;
0652: try {
0653: scheduleExecWriteRequest(request);
0654: hasBeenScheduled = true;
0655: ExecuteUpdateResult execWriteRequestResult = null;
0656: try {
0657: execWriteRequestResult = loadBalanceStatementExecuteUpdate(request);
0658: } catch (AllBackendsFailedException e) {
0659: String msg = Translate
0660: .get("requestmanager.write.request.failed.unexpected");
0661: logger.fatal(msg, e);
0662: endUserLogger.fatal(msg);
0663: throw new RuntimeException(msg, e);
0664: }
0665: updateAndNotifyExecWriteRequest(request,
0666: execWriteRequestResult.getUpdateCount());
0667: schedulerHasBeenNotified = true;
0668: return execWriteRequestResult;
0669: } finally {
0670: if (hasBeenScheduled && !schedulerHasBeenNotified)
0671: scheduler.writeCompleted(request);
0672: }
0673: }
0674:
0675: /**
0676: * Perform a write request and return the auto generated keys. Call first the
0677: * scheduler (if defined), then notify the cache (if defined) and finally call
0678: * the load balancer.
0679: *
0680: * @param request the request to execute
0681: * @return auto generated keys.
0682: * @exception SQLException if an error occurs
0683: */
0684: public GeneratedKeysResult statementExecuteUpdateWithKeys(
0685: AbstractWriteRequest request) throws SQLException {
0686: boolean hasBeenScheduled = false, schedulerHasBeenNotified = false;
0687: try {
0688: scheduleExecWriteRequest(request);
0689: hasBeenScheduled = true;
0690: GeneratedKeysResult execWriteRequestWithKeysResult = null;
0691: try {
0692: execWriteRequestWithKeysResult = loadBalanceStatementExecuteUpdateWithKeys(request);
0693: } catch (AllBackendsFailedException e) {
0694: String msg = Translate
0695: .get("requestmanager.write.request.keys.failed.unexpected");
0696: logger.fatal(msg, e);
0697: endUserLogger.fatal(msg);
0698: throw new RuntimeException(msg, e);
0699: }
0700: updateAndNotifyExecWriteRequest(request,
0701: execWriteRequestWithKeysResult.getUpdateCount());
0702: schedulerHasBeenNotified = true;
0703: return execWriteRequestWithKeysResult;
0704: } finally {
0705: if (hasBeenScheduled && !schedulerHasBeenNotified)
0706: scheduler.writeCompleted(request);
0707: }
0708: }
0709:
0710: /**
0711: * Execute a call to CallableStatement.execute() and returns a suite of
0712: * updateCount and/or ResultSets.
0713: *
0714: * @param request the stored procedure to execute
0715: * @return an <code>ExecuteResult</code> object
0716: * @throws AllBackendsFailedException if all backends failed to execute the
0717: * stored procedure
0718: * @exception SQLException if an error occurs
0719: */
0720: public ExecuteResult statementExecute(AbstractRequest request)
0721: throws AllBackendsFailedException, SQLException {
0722: // Create a fake stored procedure
0723: StoredProcedure proc = new StoredProcedure(request
0724: .getSqlOrTemplate(), request.getEscapeProcessing(),
0725: request.getTimeout(), request.getLineSeparator());
0726: proc.setIsAutoCommit(request.isAutoCommit());
0727: proc.setTransactionId(request.getTransactionId());
0728: proc.setTransactionIsolation(request.getTransactionIsolation());
0729: proc.setId(request.getId());
0730: proc.setLogin(request.getLogin());
0731: proc.setPreparedStatementParameters(request
0732: .getPreparedStatementParameters());
0733: proc.setTimeout(request.getTimeout());
0734: proc.setMaxRows(request.getMaxRows());
0735: proc.setPersistentConnection(request.isPersistentConnection());
0736: proc.setPersistentConnectionId(request
0737: .getPersistentConnectionId());
0738:
0739: boolean hasBeenScheduled = false;
0740: try {
0741: ExecuteResult result;
0742:
0743: // Schedule as a stored procedure
0744: scheduleStoredProcedure(proc);
0745: hasBeenScheduled = true;
0746:
0747: if (logger.isDebugEnabled())
0748: logger.debug(Translate.get(
0749: "requestmanager.write.stored.procedure",
0750: new String[] {
0751: String.valueOf(request.getId()),
0752: request.getSqlShortForm(vdb
0753: .getSqlShortFormLength()) }));
0754:
0755: result = loadBalanceStatementExecute(proc);
0756:
0757: updateRecoveryLogFlushCacheAndRefreshSchema(proc);
0758:
0759: // Notify scheduler of completion
0760: scheduler.storedProcedureCompleted(proc);
0761:
0762: return result;
0763: } catch (AllBackendsFailedException e) {
0764: throw e;
0765: } finally {
0766: if (hasBeenScheduled)
0767: scheduler.storedProcedureCompleted(proc);
0768: }
0769: }
0770:
0771: /**
0772: * Schedule a request for execution.
0773: *
0774: * @param request the request to execute
0775: * @throws SQLException if an error occurs
0776: */
0777: public void scheduleExecWriteRequest(AbstractWriteRequest request)
0778: throws SQLException {
0779: // Sanity check
0780: if (!request.isAutoCommit()) { // Check that the transaction has been
0781: // started
0782: long tid = request.getTransactionId();
0783: if (!transactionMetaDatas.containsKey(new Long(tid)))
0784: throw new SQLException(Translate.get(
0785: "transaction.not.started", tid));
0786: }
0787:
0788: getParsingFromCacheOrParse(request);
0789:
0790: //
0791: // SCHEDULER
0792: //
0793:
0794: if (logger.isDebugEnabled())
0795: logger.debug(Translate.get(
0796: "requestmanager.write.request.schedule",
0797: new String[] {
0798: String.valueOf(request.getId()),
0799: request.getSqlShortForm(vdb
0800: .getSqlShortFormLength()) }));
0801:
0802: // Wait for the scheduler to give us the authorization to execute
0803: try {
0804: scheduler.scheduleWriteRequest(request);
0805: } catch (RollbackException e) { // Something bad happened and we need to rollback this transaction
0806: rollback(request.getTransactionId(), true);
0807: throw new SQLException(e.getMessage());
0808: }
0809: }
0810:
0811: /**
0812: * Send the given query to the load balancer. If the request fails, the
0813: * scheduler is properly notified.
0814: *
0815: * @param request the request to execute
0816: * @return update count and auto-generated keys
0817: * @throws AllBackendsFailedException if all backends failed to execute the
0818: * query
0819: * @throws NoMoreBackendException if no backends are left to execute the
0820: * request
0821: * @throws SQLException if an error occurs
0822: */
0823: public GeneratedKeysResult loadBalanceStatementExecuteUpdateWithKeys(
0824: AbstractWriteRequest request)
0825: throws AllBackendsFailedException, NoMoreBackendException,
0826: SQLException {
0827: if (logger.isDebugEnabled())
0828: logger.debug(Translate.get(
0829: "requestmanager.write.request.balance",
0830: new String[] {
0831: String.valueOf(request.getId()),
0832: String.valueOf(request.getTransactionId()),
0833: request.getSqlShortForm(vdb
0834: .getSqlShortFormLength()) }));
0835:
0836: try { // Send the request to the load balancer
0837: return loadBalancer.statementExecuteUpdateWithKeys(request,
0838: metadataCache);
0839: } catch (Exception failed) {
0840: if (!vdb.isDistributed()) { // Notify log of failure
0841: if (recoveryLog != null)
0842: recoveryLog.logRequestCompletion(
0843: request.getLogId(), false, request
0844: .getExecTimeInMs());
0845: }
0846:
0847: String msg = Translate.get("requestmanager.request.failed",
0848: new String[] {
0849: request.getSqlShortForm(vdb
0850: .getSqlShortFormLength()),
0851: failed.getMessage() });
0852: if (failed instanceof RuntimeException)
0853: logger.warn(msg, failed);
0854:
0855: if (failed instanceof AllBackendsFailedException)
0856: throw (AllBackendsFailedException) failed;
0857: else if (failed instanceof SQLException)
0858: throw (SQLException) failed;
0859: else if (failed instanceof NoMoreBackendException)
0860: throw (NoMoreBackendException) failed;
0861: else
0862: throw new SQLException(msg);
0863: }
0864: }
0865:
0866: /**
0867: * Send the given query to the load balancer. If the request fails, the
0868: * scheduler is properly notified.
0869: *
0870: * @param request the request to execute
0871: * @throws AllBackendsFailedException if all backends failed to execute the
0872: * query
0873: * @exception NoMoreBackendException if no backends are left to execute the
0874: * request
0875: * @throws SQLException if an error occurs
0876: * @return number of modified lines
0877: */
0878: public ExecuteUpdateResult loadBalanceStatementExecuteUpdate(
0879: AbstractWriteRequest request)
0880: throws AllBackendsFailedException, NoMoreBackendException,
0881: SQLException {
0882: if (logger.isDebugEnabled())
0883: logger.debug(Translate.get(
0884: "requestmanager.write.request.balance",
0885: new String[] {
0886: String.valueOf(request.getId()),
0887: String.valueOf(request.getTransactionId()),
0888: request.getSqlShortForm(vdb
0889: .getSqlShortFormLength()) }));
0890:
0891: try { // Send the request to the load balancer
0892: if (request.isUpdate() && (resultCache != null)) { // Try the optimization if we try to update values that are already
0893: // up-to-date. Warnings will be lost anyway so forget it
0894: if (!resultCache
0895: .isUpdateNecessary((UpdateRequest) request))
0896: return new ExecuteUpdateResult(0);
0897: }
0898: return loadBalancer.statementExecuteUpdate(request);
0899: } catch (Exception failed) {
0900: if (!vdb.isDistributed()) { // Notify log of failure
0901: if (recoveryLog != null)
0902: recoveryLog.logRequestCompletion(
0903: request.getLogId(), false, request
0904: .getExecTimeInMs());
0905: }
0906:
0907: String msg = Translate.get("requestmanager.request.failed",
0908: new String[] {
0909: request.getSqlShortForm(vdb
0910: .getSqlShortFormLength()),
0911: failed.getMessage() });
0912:
0913: // Error logging
0914: if (failed instanceof RuntimeException)
0915: logger.warn(msg, failed);
0916:
0917: // Rethrow exception
0918: if (failed instanceof AllBackendsFailedException)
0919: throw (AllBackendsFailedException) failed;
0920: else if (failed instanceof SQLException)
0921: throw (SQLException) failed;
0922: else if (failed instanceof NoMoreBackendException)
0923: throw (NoMoreBackendException) failed;
0924: else
0925: throw new SQLException(msg);
0926: }
0927: }
0928:
0929: /**
0930: * Execute a request using Statement.execute() on the load balancer. Note that
0931: * we flush the cache before calling the load balancer.
0932: *
0933: * @param request the request to execute.
0934: * @return an <code>ExecuteResult</code> object
0935: * @throws SQLException if an error occurs
0936: * @throws AllBackendsFailedException if all backends failed to execute the
0937: * stored procedure
0938: */
0939: public ExecuteResult loadBalanceStatementExecute(
0940: AbstractRequest request) throws AllBackendsFailedException,
0941: SQLException {
0942: ExecuteResult result;
0943: //
0944: // CACHE
0945: //
0946:
0947: // Flush cache (if any) before
0948: if (resultCache != null) {
0949: resultCache.flushCache();
0950: }
0951:
0952: //
0953: // LOAD BALANCER
0954: //
0955:
0956: try { // Send the request to the load balancer
0957: result = loadBalancer.statementExecute(request,
0958: metadataCache);
0959: return result;
0960: } catch (Exception failed) {
0961: if (!vdb.isDistributed()) { // Notify log of failure
0962: if (recoveryLog != null)
0963: recoveryLog.logRequestCompletion(
0964: request.getLogId(), false, request
0965: .getExecTimeInMs());
0966: }
0967:
0968: String msg = Translate.get("requestmanager.request.failed",
0969: new String[] {
0970: request.getSqlShortForm(vdb
0971: .getSqlShortFormLength()),
0972: failed.getMessage() });
0973: if (failed instanceof RuntimeException)
0974: logger.warn(msg, failed);
0975:
0976: if (failed instanceof AllBackendsFailedException)
0977: throw (AllBackendsFailedException) failed;
0978: else if (failed instanceof SQLException)
0979: throw (SQLException) failed;
0980: else if (failed instanceof NoMoreBackendException)
0981: throw (NoMoreBackendException) failed;
0982: else
0983: throw (SQLException) new SQLException(msg)
0984: .initCause(failed);
0985: }
0986: }
0987:
0988: /**
0989: * Update the recovery log, cache, update the database schema if needed and
0990: * finally notify the scheduler. Note that if an error occurs, the scheduler
0991: * is always notified.
0992: *
0993: * @param request the request to execute
0994: * @param updateCount the update count if query was executed with
0995: * executeUpdate(), -1 otherwise
0996: * @throws SQLException if an error occurs
0997: */
0998: public void updateAndNotifyExecWriteRequest(
0999: AbstractWriteRequest request, int updateCount)
1000: throws SQLException {
1001: try {
1002: TransactionMetaData tm = null;
1003: if (!request.isAutoCommit()) {
1004: /*
1005: * This is a write transaction, update the transactional context so that
1006: * commit/rollback for that transaction will be logged.
1007: */
1008: tm = getTransactionMetaData(new Long(request
1009: .getTransactionId()));
1010: tm.setReadOnly(false);
1011: }
1012:
1013: // Update the recovery log (if there is one and we are not using SingleDB)
1014: if ((recoveryLog != null)
1015: && (loadBalancer.getRAIDbLevel() != RAIDbLevels.SingleDB))
1016: recoveryLog.logRequestExecuteUpdateCompletion(request
1017: .getLogId(), true, updateCount, request
1018: .getExecTimeInMs());
1019:
1020: if (request.altersSomething()) {
1021: // Start to update the query result cache first if needed (must be done
1022: // before altering the schema)
1023: if (request.altersQueryResultCache()
1024: && (resultCache != null)) {
1025: if (logger.isDebugEnabled())
1026: logger
1027: .debug(Translate
1028: .get(
1029: "requestmanager.write.request.cache.update",
1030: new String[] {
1031: String
1032: .valueOf(request
1033: .getId()),
1034: request
1035: .getSqlShortForm(vdb
1036: .getSqlShortFormLength()) }));
1037:
1038: if (tm != null)
1039: tm.setAltersQueryResultCache(true);
1040: resultCache.writeNotify(request);
1041: }
1042:
1043: // Update the schema if needed
1044: if (request.altersDatabaseSchema()
1045: && (requiredParsingGranularity != ParsingGranularities.NO_PARSING)) {
1046: if (tm != null)
1047: tm.setAltersDatabaseSchema(true);
1048:
1049: DatabaseSchema currentSchema = getDatabaseSchema();
1050: if (currentSchema == null) {
1051: // the schema can be set to null during force disable backend.
1052: // Don't try to update it (see UNICLUSTER-246), just flag it as
1053: // to be refreshed
1054: setSchemaIsDirty(true);
1055: } else {
1056: if (request.isCreate()) { // Add the table to the schema
1057: CreateRequest createRequest = (CreateRequest) request;
1058: if (createRequest.getDatabaseTable() != null) {
1059: currentSchema
1060: .addTable(new DatabaseTable(
1061: ((CreateRequest) request)
1062: .getDatabaseTable()));
1063: if (logger.isDebugEnabled())
1064: logger
1065: .debug(Translate
1066: .get(
1067: "requestmanager.schema.add.table",
1068: request
1069: .getTableName()));
1070: // TODO : right now, we ask a complete schema refresh
1071: // Optimization, we should refresh only this table
1072: setSchemaIsDirty(true);
1073: } else
1074: // Some other create statement that modifies the schema, force
1075: // refresh
1076: setSchemaIsDirty(true);
1077: } else if (request.isDrop()) { // Delete the table from the schema
1078: SortedSet tables = ((DropRequest) request)
1079: .getTablesToDrop();
1080: if (tables != null) { // Tables to drop !
1081: for (Iterator iter = tables.iterator(); iter
1082: .hasNext();) {
1083: String tableName = (String) iter
1084: .next();
1085: DatabaseTable table = currentSchema
1086: .getTable(tableName);
1087: if (table == null) {
1088: // Table not found, force refresh
1089: setSchemaIsDirty(true);
1090: } else { // Table found, let's try to update the schema
1091: if (currentSchema
1092: .removeTable(table)) {
1093: if (logger.isDebugEnabled())
1094: logger
1095: .debug(Translate
1096: .get(
1097: "requestmanager.schema.remove.table",
1098: table
1099: .getName()));
1100:
1101: // Remove table from depending tables
1102: if (logger.isDebugEnabled())
1103: logger
1104: .debug("Removing table '"
1105: + table
1106: .getName()
1107: + "' from dependending tables in request manager database schema");
1108: currentSchema
1109: .removeTableFromDependingTables(table);
1110: } else {
1111: // Table not found, force refresh
1112: setSchemaIsDirty(true);
1113: }
1114: }
1115: }
1116: }
1117: } else if (request.isAlter()
1118: && (requiredParsingGranularity > ParsingGranularities.TABLE)) { // Add or drop the column from the table
1119: AlterRequest req = (AlterRequest) request;
1120: DatabaseTable alteredTable = currentSchema
1121: .getTable(req.getTableName());
1122: if ((alteredTable != null)
1123: && (req.getColumn() != null)) {
1124: if (req.isDrop())
1125: alteredTable.removeColumn(req
1126: .getColumn().getName());
1127: else if (req.isAdd())
1128: alteredTable.addColumn(req
1129: .getColumn());
1130: else
1131: // Unsupported, force refresh
1132: setSchemaIsDirty(true);
1133: } else
1134: // Table not found, force refresh
1135: setSchemaIsDirty(true);
1136: } else
1137: // Unsupported, force refresh
1138: setSchemaIsDirty(true);
1139: }
1140: }
1141: if (request.altersMetadataCache()
1142: && (metadataCache != null)) {
1143: if (tm != null)
1144: tm.setAltersMetadataCache(true);
1145:
1146: metadataCache.flushCache();
1147: }
1148:
1149: // The following modifications are not specifically handled since there
1150: // is no cache for such structure yet
1151:
1152: // if (request.altersAggregateList())
1153: // ;
1154: // if (request.altersDatabaseCatalog())
1155: // ;
1156: // if (request.altersStoredProcedureList())
1157: // ;
1158: // if (request.altersUserDefinedTypes())
1159: // ;
1160: // if (request.altersUsers())
1161: // ;
1162:
1163: }
1164: } catch (Exception failed) {
1165: logger.fatal("---");
1166: logger.fatal("SHIT", failed);
1167: logger.fatal("---");
1168: String msg = Translate.get("requestmanager.request.failed",
1169: new String[] {
1170: request.getSqlShortForm(vdb
1171: .getSqlShortFormLength()),
1172: failed.getMessage() });
1173: if (failed instanceof RuntimeException)
1174: logger.warn(msg, failed);
1175: else
1176: logger.warn(msg);
1177: throw new SQLException(msg);
1178: } finally {
1179: // Notify scheduler
1180: scheduler.writeCompleted(request);
1181: }
1182: }
1183:
1184: /**
1185: * Call a stored procedure that returns a ResultSet.
1186: *
1187: * @param proc the stored procedure call
1188: * @return a <code>ControllerResultSet</code> value
1189: * @throws AllBackendsFailedException if all backends failed to execute the
1190: * stored procedure
1191: * @exception SQLException if an error occurs
1192: */
1193: public ControllerResultSet callableStatementExecuteQuery(
1194: StoredProcedure proc) throws AllBackendsFailedException,
1195: SQLException {
1196: ControllerResultSet result = null;
1197: boolean hasBeenScheduled = false;
1198: boolean success = false;
1199: try {
1200: scheduleStoredProcedure(proc);
1201: hasBeenScheduled = true;
1202:
1203: if (logger.isDebugEnabled())
1204: logger.debug(Translate.get(
1205: "requestmanager.read.stored.procedure",
1206: new String[] {
1207: String.valueOf(proc.getId()),
1208: proc.getSqlShortForm(vdb
1209: .getSqlShortFormLength()) }));
1210:
1211: result = loadBalanceCallableStatementExecuteQuery(proc);
1212:
1213: updateRecoveryLogFlushCacheAndRefreshSchema(proc);
1214:
1215: success = true;
1216: return result;
1217: } catch (AllBackendsFailedException e) {
1218: throw e;
1219: } catch (NoMoreBackendException e) {
1220: throw e;
1221: } catch (Exception failed) {
1222: String msg = Translate.get(
1223: "requestmanager.stored.procedure.failed",
1224: new String[] {
1225: proc.getSqlShortForm(vdb
1226: .getSqlShortFormLength()),
1227: failed.getMessage() });
1228: logger.warn(msg);
1229: if (failed instanceof SQLException) {
1230: throw (SQLException) failed;
1231: }
1232: throw new SQLException(msg);
1233: } finally {
1234: if (hasBeenScheduled)
1235: scheduler.storedProcedureCompleted(proc);
1236: if (!vdb.isDistributed() && !success) { // Notify log of failure
1237: if (recoveryLog != null)
1238: recoveryLog.logRequestCompletion(proc.getLogId(),
1239: false, proc.getExecTimeInMs());
1240: }
1241: }
1242: }
1243:
1244: /**
1245: * Call a stored procedure that performs an update.
1246: *
1247: * @param proc the stored procedure call
1248: * @return number of rows affected
1249: * @throws AllBackendsFailedException if all backends failed to execute the
1250: * stored procedure
1251: * @exception SQLException if an error occurs
1252: */
1253: public ExecuteUpdateResult callableStatementExecuteUpdate(
1254: StoredProcedure proc) throws AllBackendsFailedException,
1255: SQLException {
1256: ExecuteUpdateResult result;
1257: boolean hasBeenScheduled = false;
1258: boolean success = false;
1259: try {
1260: // Wait for the scheduler to give us the authorization to execute
1261: scheduleStoredProcedure(proc);
1262: hasBeenScheduled = true;
1263:
1264: if (logger.isDebugEnabled())
1265: logger.debug(Translate.get(
1266: "requestmanager.write.stored.procedure",
1267: new String[] {
1268: String.valueOf(proc.getId()),
1269: proc.getSqlShortForm(vdb
1270: .getSqlShortFormLength()) }));
1271:
1272: result = loadBalanceCallableStatementExecuteUpdate(proc);
1273:
1274: updateRecoveryLogFlushCacheAndRefreshSchema(proc);
1275:
1276: // Notify scheduler of completion
1277: scheduler.storedProcedureCompleted(proc);
1278:
1279: success = true;
1280: return result;
1281: } catch (AllBackendsFailedException e) {
1282: throw e;
1283: } catch (Exception failed) {
1284: String msg = Translate.get(
1285: "requestmanager.stored.procedure.failed",
1286: new String[] {
1287: proc.getSqlShortForm(vdb
1288: .getSqlShortFormLength()),
1289: failed.getMessage() });
1290: logger.warn(msg);
1291: if (failed instanceof SQLException) {
1292: throw (SQLException) failed;
1293: }
1294: throw new SQLException(msg);
1295: } finally {
1296: if (hasBeenScheduled)
1297: scheduler.storedProcedureCompleted(proc);
1298: if (!vdb.isDistributed() && !success) { // Notify log of failure
1299: if (recoveryLog != null)
1300: recoveryLog.logRequestCompletion(proc.getLogId(),
1301: false, proc.getExecTimeInMs());
1302: }
1303: }
1304: }
1305:
1306: /**
1307: * Execute a call to CallableStatement.execute() and returns a suite of
1308: * updateCount and/or ResultSets.
1309: *
1310: * @param proc the stored procedure to execute
1311: * @return an <code>ExecuteResult</code> object
1312: * @throws AllBackendsFailedException if all backends failed to execute the
1313: * stored procedure
1314: * @exception SQLException if an error occurs
1315: */
1316: public ExecuteResult callableStatementExecute(StoredProcedure proc)
1317: throws AllBackendsFailedException, SQLException {
1318: ExecuteResult result;
1319: boolean hasBeenScheduled = false;
1320: boolean success = false;
1321: try {
1322: scheduleStoredProcedure(proc);
1323: hasBeenScheduled = true;
1324:
1325: if (logger.isDebugEnabled())
1326: logger.debug(Translate.get(
1327: "requestmanager.write.stored.procedure",
1328: new String[] {
1329: String.valueOf(proc.getId()),
1330: proc.getSqlShortForm(vdb
1331: .getSqlShortFormLength()) }));
1332:
1333: result = loadBalanceCallableStatementExecute(proc);
1334:
1335: updateRecoveryLogFlushCacheAndRefreshSchema(proc);
1336:
1337: success = true;
1338: return result;
1339: } catch (AllBackendsFailedException e) {
1340: throw e;
1341: } catch (NoMoreBackendException e) {
1342: throw e;
1343: } catch (Exception failed) {
1344: String msg = Translate.get(
1345: "requestmanager.stored.procedure.failed",
1346: new String[] {
1347: proc.getSqlShortForm(vdb
1348: .getSqlShortFormLength()),
1349: failed.getMessage() });
1350: logger.warn(msg);
1351: if (failed instanceof SQLException) {
1352: throw (SQLException) failed;
1353: }
1354: throw new SQLException(msg);
1355: } finally {
1356: if (hasBeenScheduled)
1357: scheduler.storedProcedureCompleted(proc);
1358: if (!vdb.isDistributed() && !success) { // Notify log of failure
1359: if (recoveryLog != null)
1360: recoveryLog.logRequestCompletion(proc.getLogId(),
1361: false, proc.getExecTimeInMs());
1362: }
1363: }
1364: }
1365:
1366: /**
1367: * This method does some sanity check on the given stored procedure and then
1368: * tries to schedule it. Note that it is more likely that on a stored
1369: * procedure the scheduler will lock in write the entire database as it does
1370: * not know which tables are accessed by the procedure.
1371: *
1372: * @param proc the stored procedure to schedule
1373: * @throws SQLException if an error occurs
1374: */
1375: public void scheduleStoredProcedure(StoredProcedure proc)
1376: throws SQLException {
1377: // Sanity check
1378: if (!proc.isAutoCommit()) { // Check that the transaction has been started
1379: long tid = proc.getTransactionId();
1380: if (!transactionMetaDatas.containsKey(new Long(tid)))
1381: throw new SQLException(Translate.get(
1382: "transaction.not.started", tid));
1383: }
1384:
1385: getParsingFromCacheOrParse(proc);
1386:
1387: //
1388: // SCHEDULER
1389: //
1390:
1391: // Wait for the scheduler to give us the authorization to execute
1392: try {
1393: scheduler.scheduleStoredProcedure(proc);
1394: } catch (RollbackException e) { // Something bad happened and we need to rollback this transaction
1395: rollback(proc.getTransactionId(), true);
1396: throw new SQLException(e.getMessage());
1397: }
1398: }
1399:
1400: /**
1401: * Execute a read stored procedure on the load balancer. Note that we flush
1402: * the cache before calling the load balancer.
1403: *
1404: * @param proc the stored procedure to call
1405: * @return the corresponding ControllerResultSet
1406: * @throws SQLException if an error occurs
1407: * @throws AllBackendsFailedException if all backends failed to execute the
1408: * stored procedure
1409: */
1410: public ControllerResultSet loadBalanceCallableStatementExecuteQuery(
1411: StoredProcedure proc) throws SQLException,
1412: AllBackendsFailedException {
1413: DatabaseProcedureSemantic semantic = proc.getSemantic();
1414: ControllerResultSet result;
1415:
1416: //
1417: // CACHE
1418: //
1419:
1420: // Cache is always flushed unless the user has explicitely set the
1421: // connection to read-only mode in which case we assume that the
1422: // users deliberately forces the cache not to be flushed when calling
1423: // this stored procedure.
1424: if ((resultCache != null) && (!proc.isReadOnly())) {
1425: if ((semantic == null) || (semantic.isWrite()))
1426: resultCache.flushCache();
1427: }
1428:
1429: //
1430: // LOAD BALANCER
1431: //
1432:
1433: // Send the request to the load balancer
1434: if (proc.isReadOnly()
1435: || ((semantic != null) && (semantic.isReadOnly())))
1436: result = loadBalancer
1437: .readOnlyCallableStatementExecuteQuery(proc,
1438: metadataCache);
1439: else {
1440: // Disable fetch size with a distributed execution
1441: proc.setFetchSize(0);
1442: result = loadBalancer.callableStatementExecuteQuery(proc,
1443: metadataCache);
1444: }
1445: return result;
1446: }
1447:
1448: /**
1449: * Execute a write stored procedure on the load balancer. Note that we flush
1450: * the cache before calling the load balancer.
1451: *
1452: * @param proc the stored procedure to call
1453: * @return the number of updated rows
1454: * @throws SQLException if an error occurs
1455: * @throws AllBackendsFailedException if all backends failed to execute the
1456: * stored procedure
1457: */
1458: public ExecuteUpdateResult loadBalanceCallableStatementExecuteUpdate(
1459: StoredProcedure proc) throws AllBackendsFailedException,
1460: SQLException {
1461: ExecuteUpdateResult result;
1462: //
1463: // CACHE
1464: //
1465:
1466: // Flush cache (if any) before as we don't properly lock the tables
1467: if (resultCache != null) {
1468: DatabaseProcedureSemantic semantic = proc.getSemantic();
1469: if ((semantic == null) || (semantic.isWrite()))
1470: resultCache.flushCache();
1471: }
1472:
1473: //
1474: // LOAD BALANCER
1475: //
1476:
1477: // Send the request to the load balancer
1478: result = loadBalancer.callableStatementExecuteUpdate(proc);
1479: return result;
1480: }
1481:
1482: /**
1483: * Execute a write stored procedure on the load balancer. Note that we flush
1484: * the cache before calling the load balancer.
1485: *
1486: * @param proc the stored procedure to call
1487: * @return an <code>ExecuteResult</code> object
1488: * @throws SQLException if an error occurs
1489: * @throws AllBackendsFailedException if all backends failed to execute the
1490: * stored procedure
1491: */
1492: public ExecuteResult loadBalanceCallableStatementExecute(
1493: StoredProcedure proc) throws AllBackendsFailedException,
1494: SQLException {
1495: DatabaseProcedureSemantic semantic = proc.getSemantic();
1496: ExecuteResult result;
1497:
1498: //
1499: // CACHE
1500: //
1501:
1502: // Flush cache (if any) before as we don't properly lock the tables
1503: if (resultCache != null) {
1504: if ((semantic == null) || (semantic.isWrite()))
1505: resultCache.flushCache();
1506: }
1507:
1508: //
1509: // LOAD BALANCER
1510: //
1511:
1512: // Send the request to the load balancer
1513: if (proc.isReadOnly()
1514: || ((semantic != null) && (semantic.isReadOnly())))
1515: result = loadBalancer.readOnlyCallableStatementExecute(
1516: proc, metadataCache);
1517: else
1518: result = loadBalancer.callableStatementExecute(proc,
1519: metadataCache);
1520: return result;
1521: }
1522:
1523: /**
1524: * Update the recovery log with successful completion of the query, flush the
1525: * cache and force schema refresh if needed.
1526: *
1527: * @param proc the stored procedure to log
1528: * @throws SQLException if the transaction context could not be updated
1529: */
1530: public void updateRecoveryLogFlushCacheAndRefreshSchema(
1531: StoredProcedure proc) throws SQLException {
1532: // Stored procedures executing on a read-only connection don't update
1533: // anything
1534: if (proc.isReadOnly())
1535: return;
1536:
1537: DatabaseProcedureSemantic semantic = proc.getSemantic();
1538: TransactionMetaData tm = null;
1539: if (!proc.isAutoCommit())
1540: tm = getTransactionMetaData(new Long(proc
1541: .getTransactionId()));
1542:
1543: if ((semantic == null) || (semantic.hasDDLWrite())) {
1544: // Schema might have been updated, force refresh
1545: if ((semantic == null) && (logger.isDebugEnabled()))
1546: logger.debug("No semantic found for stored procedure "
1547: + proc.getSqlShortForm(vdb
1548: .getSqlShortFormLength())
1549: + ". Forcing a schema refresh.");
1550: setSchemaIsDirty(true);
1551:
1552: if (tm != null)
1553: tm.setAltersDatabaseSchema(true);
1554: }
1555:
1556: if ((semantic == null) || (semantic.isWrite())) {
1557: // Update the recovery log (if there is one and we are not using SingleDB)
1558: if ((recoveryLog != null)
1559: && (loadBalancer.getRAIDbLevel() != RAIDbLevels.SingleDB))
1560: recoveryLog.logRequestCompletion(proc.getLogId(), true,
1561: proc.getExecTimeInMs());
1562:
1563: //
1564: // Update transaction context if needed to force commit to be logged
1565: // (occurs if callableStatement.executeQuery() is called without semantic
1566: // information.
1567: //
1568: if (tm != null) {
1569: tm.setReadOnly(false);
1570: tm.setAltersQueryResultCache(true);
1571: }
1572:
1573: //
1574: // CACHE
1575: //
1576:
1577: // Flush cache (if any) after for consistency (we don't know what has been
1578: // modified by the stored procedure)
1579: if (resultCache != null)
1580: resultCache.flushCache();
1581:
1582: if (metadataCache != null)
1583: metadataCache.flushCache();
1584: }
1585: }
1586:
1587: /**
1588: * Return a ControllerResultSet containing the PreparedStatement metaData of
1589: * the given sql template
1590: *
1591: * @param request the request containing the sql template
1592: * @return an empty ControllerResultSet with the metadata
1593: * @throws SQLException if a database error occurs
1594: */
1595: public ControllerResultSet getPreparedStatementGetMetaData(
1596: AbstractRequest request) throws SQLException {
1597: return loadBalancer.getPreparedStatementGetMetaData(request);
1598: }
1599:
1600: //
1601: // Transaction management
1602: //
1603:
1604: /**
1605: * Begin a new transaction and return the corresponding transaction
1606: * identifier. This method is called from the driver when setAutoCommit(false)
1607: * is called.
1608: * <p>
1609: * Note that the transaction begin is not logged in the recovery log by this
1610: * method, you will have to call logLazyTransactionBegin.
1611: *
1612: * @param login the login used by the connection
1613: * @param isPersistentConnection true if the transaction is started on a
1614: * persistent connection
1615: * @param persistentConnectionId persistent connection id if the transaction
1616: * must be started on a persistent connection
1617: * @return long a unique transaction identifier
1618: * @throws SQLException if an error occurs
1619: * @see #logLazyTransactionBegin(long)
1620: */
1621: public long begin(String login, boolean isPersistentConnection,
1622: long persistentConnectionId) throws SQLException {
1623: long tid = scheduler.getNextTransactionId();
1624: doBegin(login, tid, isPersistentConnection,
1625: persistentConnectionId);
1626: return tid;
1627: }
1628:
1629: /**
1630: * Begins a new transaction for the given <code>login</code> and
1631: * <code>tid</code>. Informs the loadbalancer and the scheduler about this
1632: * begun transaction.
1633: *
1634: * @param login the login used by the connection
1635: * @param tid the tid associed with the transaction to begin
1636: * @param isPersistentConnection true if the transaction is started on a
1637: * persistent connection
1638: * @param persistentConnectionId persistent connection id if the transaction
1639: * must be started on a persistent connection
1640: * @throws SQLException if an error occurs
1641: */
1642: public void doBegin(String login, long tid,
1643: boolean isPersistentConnection, long persistentConnectionId)
1644: throws SQLException {
1645: try {
1646: TransactionMetaData tm = new TransactionMetaData(tid,
1647: beginTimeout, login, isPersistentConnection,
1648: persistentConnectionId);
1649: scheduler.begin(tm, false, null);
1650:
1651: if (logger.isDebugEnabled())
1652: logger.debug(Translate.get("transaction.begin", String
1653: .valueOf(tid)));
1654:
1655: try {
1656: // Send to load balancer
1657: loadBalancer.begin(tm);
1658: // the tid is stored *now* before notifying the scheduler of begin
1659: // completion to avoid releasing pending write queries before
1660: // transaction list is updated
1661: transactionMetaDatas.put(new Long(tid), tm);
1662: } catch (SQLException e) {
1663: throw e;
1664: } finally {
1665: // Notify scheduler for completion in any case
1666: scheduler.beginCompleted(tid);
1667: }
1668: } catch (RuntimeException e) {
1669: String msg = Translate
1670: .get("fatal.runtime.exception.requestmanager.begin");
1671: logger.fatal(msg, e);
1672: endUserLogger.fatal(msg);
1673: throw new SQLException(e.getMessage());
1674: }
1675: }
1676:
1677: /**
1678: * Log the begin of a transaction that is started lazily. In fact, we just log
1679: * the begin when we execute the first write request in a transaction to
1680: * prevent logging begin/commit for read-only transactions. This also prevents
1681: * a problem with backends that are disabled with a checkpoint when no request
1682: * has been played in the transaction but the begin statement has already been
1683: * logged. In that case, the transaction would not be properly replayed at
1684: * restore time.
1685: *
1686: * @param transactionId the transaction id begin to log
1687: * @throws SQLException if an error occurs
1688: */
1689: public void logLazyTransactionBegin(long transactionId)
1690: throws SQLException {
1691: try {
1692: Long tid = new Long(transactionId);
1693: TransactionMetaData tm = getTransactionMetaData(tid);
1694:
1695: if (logger.isDebugEnabled())
1696: logger.debug(Translate.get("transaction.begin.log",
1697: String.valueOf(transactionId)));
1698:
1699: // Log the begin
1700: if (recoveryLog != null)
1701: recoveryLog.logBegin(tm);
1702: } catch (RuntimeException e) {
1703: String msg = Translate
1704: .get("fatal.runtime.exception.requestmanager.begin.log");
1705: logger.fatal(msg, e);
1706: endUserLogger.fatal(msg);
1707: throw new SQLException(e.getMessage());
1708: }
1709: }
1710:
1711: /**
1712: * Abort a transaction that has been started but in which no query was
1713: * executed. As we use lazy transaction begin, there is no need to rollback
1714: * such transaction but just to cleanup the metadata associated with this not
1715: * effectively started transaction.
1716: *
1717: * @param transactionId id of the transaction to abort
1718: * @param logAbort true if the abort (in fact rollback) should be logged in
1719: * the recovery log
1720: * @param forceAbort true if the abort will be forced. Actually, abort will do
1721: * nothing when a transaction has savepoints (we do not abort the
1722: * whole transaction, so that the user can rollback to a previous
1723: * savepoint), except when the connection is closed. In this last
1724: * case, if the transaction is not aborted, it prevents future
1725: * maintenance operations such as shutdowns, enable/disable from
1726: * completing, so we have to force this abort operation. It also
1727: * applies to the DeadlockDetectionThread and the cleanup of the
1728: * VirtualDatabaseWorkerThread.
1729: * @throws SQLException if an error occurs
1730: */
1731: public void abort(long transactionId, boolean logAbort,
1732: boolean forceAbort) throws SQLException {
1733: TransactionMetaData tm;
1734: try {
1735: Long tid = new Long(transactionId);
1736: boolean tmIsFake = false;
1737: try {
1738: tm = getTransactionMetaData(tid);
1739: if (!forceAbort && tidSavepoints.get(tid) != null) {
1740: if (logger.isDebugEnabled())
1741: logger
1742: .debug("Transaction "
1743: + transactionId
1744: + " has savepoints, transaction will not be aborted");
1745: return;
1746: }
1747: } catch (SQLException e1) {
1748: logger
1749: .warn("No transaction metadata found to abort transaction "
1750: + transactionId
1751: + ". Creating a fake context for abort.");
1752: // Note that we ignore the persistent connection id (will be retrieved
1753: // by the connection manager)
1754: tm = new TransactionMetaData(transactionId, 0,
1755: RecoveryLog.UNKNOWN_USER, false, 0);
1756: tm.setReadOnly(!logAbort);
1757: tmIsFake = true;
1758: if (tidSavepoints.get(tid) != null) {
1759: if (logger.isDebugEnabled())
1760: logger
1761: .debug("Transaction "
1762: + transactionId
1763: + " has savepoints, transaction will not be aborted");
1764: return;
1765: }
1766: }
1767:
1768: VirtualDatabaseWorkerThread vdbwt = vdb
1769: .getVirtualDatabaseWorkerThreadForTransaction(transactionId);
1770: if (vdbwt != null)
1771: vdbwt.notifyAbort(transactionId);
1772:
1773: if (logger.isDebugEnabled())
1774: logger.debug(Translate.get("transaction.aborting",
1775: String.valueOf(transactionId)));
1776:
1777: // Notify the scheduler to abort which is the same as a rollback
1778: // from a scheduler point of view.
1779:
1780: boolean abortScheduled = false;
1781: try {
1782: if (!tmIsFake) {
1783: scheduler.rollback(tm, null);
1784: abortScheduled = true;
1785: }
1786:
1787: loadBalancer.abort(tm);
1788:
1789: // Update recovery log
1790: if (recoveryLog != null)
1791: recoveryLog.logRequestCompletion(tm.getLogId(),
1792: true, 0);
1793:
1794: // Invalidate the query result cache if this transaction has updated the
1795: // cache or altered the schema
1796: if ((resultCache != null)
1797: && (tm.altersQueryResultCache() || tm
1798: .altersDatabaseSchema()))
1799: resultCache.rollback(transactionId);
1800:
1801: // Check for schema modifications that need to be rollbacked
1802: if (tm.altersDatabaseSchema()) {
1803: if (metadataCache != null)
1804: metadataCache.flushCache();
1805: setSchemaIsDirty(true);
1806: }
1807: }
1808:
1809: // FIXME This whole catch block is ugly. In particular the Rollback task
1810: // is re-created. The original task is created and posted on the total
1811: // order queue by the call to loadBalancer.abort() above.
1812: // The aim is to ensure that we log the rollback when no backends are
1813: // enabled even for read only transactions. This is needed since a begin
1814: // could have been previously logged.
1815: catch (NoMoreBackendException e) {
1816: // Log the query in any case for later recovery (if the request really
1817: // failed, it will be unloged later)
1818: if (getRecoveryLog() != null) {
1819: if (logger.isDebugEnabled())
1820: logger
1821: .debug(Translate
1822: .get(
1823: "virtualdatabase.distributed.abort.logging.only",
1824: transactionId));
1825:
1826: // Wait to be sure that we log in the proper order
1827: DistributedRollback totalOrderAbort = new DistributedRollback(
1828: tm.getLogin(), transactionId);
1829: if (getLoadBalancer().waitForTotalOrder(
1830: totalOrderAbort, false))
1831: getLoadBalancer()
1832: .removeObjectFromAndNotifyTotalOrderQueue(
1833: totalOrderAbort);
1834:
1835: // Update recovery log
1836: this .getRecoveryLog().logRequestCompletion(
1837: tm.getLogId(), false, 0);
1838: e.setRecoveryLogId(tm.getLogId());
1839: e.setLogin(tm.getLogin());
1840: }
1841: throw e;
1842: }
1843:
1844: catch (SQLException e) {
1845: /*
1846: * Check if the we have to remove the entry from the total order queue
1847: * (wait to be sure that we do it in the proper order)
1848: */
1849: DistributedRollback totalOrderAbort = new DistributedRollback(
1850: tm.getLogin(), transactionId);
1851: if (loadBalancer.waitForTotalOrder(totalOrderAbort,
1852: false))
1853: loadBalancer
1854: .removeObjectFromAndNotifyTotalOrderQueue(totalOrderAbort);
1855:
1856: // Update recovery log
1857: if (recoveryLog != null)
1858: recoveryLog.logRequestCompletion(tm.getLogId(),
1859: false, 0);
1860:
1861: throw e;
1862: } finally {
1863: if (abortScheduled) // is not set if tm is fake
1864: {
1865: // Notify scheduler for completion
1866: scheduler.rollbackCompleted(tm, true);
1867: }
1868:
1869: completeTransaction(tid);
1870:
1871: if (logger.isDebugEnabled())
1872: logger.debug(Translate.get("transaction.aborted",
1873: String.valueOf(transactionId)));
1874: }
1875: } catch (RuntimeException e) {
1876: String msg = Translate
1877: .get("fatal.runtime.exception.requestmanager.abort");
1878: logger.fatal(msg, e);
1879: endUserLogger.fatal(msg);
1880: throw new SQLException(e.getMessage());
1881: }
1882: }
1883:
1884: /**
1885: * Get the TransactionMetaData for the given transaction id.
1886: *
1887: * @param tid transaction id
1888: * @return the TransactionMetaData
1889: * @throws SQLException if no marker has been found for this transaction
1890: */
1891: public TransactionMetaData getTransactionMetaData(Long tid)
1892: throws SQLException {
1893: TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1894: .get(tid);
1895:
1896: if (tm == null)
1897: throw new SQLException(Translate
1898: .get("transaction.marker.not.found", String
1899: .valueOf(tid)));
1900:
1901: return tm;
1902: }
1903:
1904: /**
1905: * Complete the transaction by removing it from the transactionMetaDatas.
1906: *
1907: * @param tid transaction id
1908: */
1909: public void completeTransaction(Long tid) {
1910: if (loadBalancer.waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL) {
1911: transactionMetaDatas.remove(tid);
1912: tidSavepoints.remove(tid);
1913: } else { // Asynchronous execution, wait for the last backend to complete the
1914: // transaction
1915: TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1916: .get(tid);
1917: if (tm != null) { // Check that everyone has release its locks
1918: if (tm.isLockedByBackends()) {
1919: return;
1920: }
1921: transactionMetaDatas.remove(tid);
1922: }
1923: tidSavepoints.remove(tid);
1924: }
1925: }
1926:
1927: /**
1928: * Commit a transaction given its id.
1929: *
1930: * @param transactionId the transaction id
1931: * @param logCommit true if the commit should be logged in the recovery log
1932: * @param emptyTransaction true if this transaction has not executed any
1933: * request
1934: * @throws SQLException if an error occurs
1935: */
1936: public void commit(long transactionId, boolean logCommit,
1937: boolean emptyTransaction) throws SQLException {
1938: try {
1939: Long tid = new Long(transactionId);
1940: TransactionMetaData tm = getTransactionMetaData(tid);
1941:
1942: boolean commitScheduled = false;
1943: boolean success = false;
1944: try {
1945: scheduler.commit(tm, emptyTransaction, null);
1946: commitScheduled = true;
1947: if (!emptyTransaction) {
1948: if (logger.isDebugEnabled())
1949: logger.debug(Translate.get(
1950: "transaction.commit", String
1951: .valueOf(tid)));
1952:
1953: // Send to load balancer
1954: loadBalancer.commit(tm);
1955:
1956: // Notify the cache
1957: if (resultCache != null)
1958: resultCache.commit(tm.getTransactionId());
1959: }
1960: success = true;
1961: } catch (SQLException e) {
1962: /*
1963: * Check if the we have to remove the entry from the total order queue
1964: * (wait to be sure that we do it in the proper order)
1965: */
1966: DistributedCommit totalOrderCommit = new DistributedCommit(
1967: tm.getLogin(), transactionId);
1968: if (loadBalancer.waitForTotalOrder(totalOrderCommit,
1969: false))
1970: loadBalancer
1971: .removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1972:
1973: throw e;
1974: } catch (AllBackendsFailedException e) {
1975: String msg = "All backends failed to commit transaction "
1976: + transactionId + " (" + e + ")";
1977: logger.error(msg);
1978: throw new SQLException(msg);
1979: } finally {
1980: // Update the recovery log
1981: // The recovery log will take care of read-only transactions to log
1982: // only what is required.
1983: if (recoveryLog != null && tm.getLogId() != 0)
1984: recoveryLog.logRequestCompletion(tm.getLogId(),
1985: success, 0);
1986: if (commitScheduled) {
1987: // Notify scheduler for completion
1988: scheduler.commitCompleted(tm, success);
1989: }
1990: if (success) {
1991: completeTransaction(tid);
1992: }
1993: }
1994: } catch (RuntimeException e) {
1995: String msg = Translate
1996: .get("fatal.runtime.exception.requestmanager.commit");
1997: logger.fatal(msg, e);
1998: endUserLogger.fatal(msg);
1999: throw new SQLException(e.getMessage());
2000: }
2001: }
2002:
2003: /**
2004: * Rollback a transaction given its id.
2005: *
2006: * @param transactionId the transaction id
2007: * @param logRollback true if the rollback should be logged in the recovery
2008: * log
2009: * @throws SQLException if an error occurs
2010: */
2011: public void rollback(long transactionId, boolean logRollback)
2012: throws SQLException {
2013: try {
2014: Long tid = new Long(transactionId);
2015: TransactionMetaData tm = getTransactionMetaData(tid);
2016:
2017: // Wait for the scheduler to give us the authorization to execute
2018:
2019: boolean rollbackScheduled = false;
2020: boolean success = false;
2021: try {
2022: scheduler.rollback(tm, null);
2023: rollbackScheduled = true;
2024:
2025: if (logger.isDebugEnabled())
2026: logger.debug(Translate.get("transaction.rollback",
2027: String.valueOf(transactionId)));
2028:
2029: // Send to load balancer
2030: loadBalancer.rollback(tm);
2031:
2032: // Invalidate the query result cache if this transaction has updated the
2033: // cache or altered the schema
2034: if ((resultCache != null)
2035: && (tm.altersQueryResultCache() || tm
2036: .altersDatabaseSchema()))
2037: resultCache.rollback(transactionId);
2038:
2039: // Check for schema modifications that need to be rollbacked
2040: if (tm.altersDatabaseSchema()) {
2041: if (metadataCache != null)
2042: metadataCache.flushCache();
2043: setSchemaIsDirty(true);
2044: }
2045:
2046: success = true;
2047: } catch (SQLException e) {
2048: /*
2049: * Check if the we have to remove the entry from the total order queue
2050: * (wait to be sure that we do it in the proper order)
2051: */
2052: DistributedRollback totalOrderRollback = new DistributedRollback(
2053: tm.getLogin(), transactionId);
2054: if (loadBalancer.waitForTotalOrder(totalOrderRollback,
2055: false)) {
2056: loadBalancer
2057: .removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
2058: } else if (recoveryLog != null) { // Force rollback logging even in the case of failure. The recovery
2059: // log will take care of empty or read-only transactions that do not
2060: // need to log the rollback.
2061: if (!recoveryLog.findRollbackForTransaction(tm
2062: .getTransactionId()))
2063: recoveryLog.logRollback(tm);
2064: if (!rollbackScheduled)
2065: recoveryLog.logRequestCompletion(tm.getLogId(),
2066: true, 0);
2067: }
2068:
2069: throw e;
2070: } catch (AllBackendsFailedException e) {
2071: String msg = Translate.get(
2072: "requestmanager.rollback.failed.all",
2073: new String[] { String.valueOf(transactionId),
2074: e.getMessage() });
2075: logger.error(msg);
2076: throw new SQLException(msg);
2077: } finally {
2078: if (rollbackScheduled) {
2079: // Update the recovery log
2080: if (recoveryLog != null)
2081: recoveryLog.logRequestCompletion(tm.getLogId(),
2082: true, 0);
2083:
2084: // Notify scheduler for completion
2085: scheduler.rollbackCompleted(tm, success);
2086: }
2087:
2088: completeTransaction(tid);
2089: }
2090: } catch (RuntimeException e) {
2091: String msg = Translate
2092: .get("fatal.runtime.exception.requestmanager.rollback");
2093: logger.fatal(msg, e);
2094: endUserLogger.fatal(msg);
2095: throw new SQLException(e.getMessage());
2096: }
2097: }
2098:
2099: /**
2100: * Rollback a transaction given its id to a savepoint given its name.
2101: *
2102: * @param transactionId the transaction id
2103: * @param savepointName the name of the savepoint
2104: * @throws SQLException if an error occurs
2105: */
2106: public void rollback(long transactionId, String savepointName)
2107: throws SQLException {
2108: try {
2109: Long tid = new Long(transactionId);
2110: TransactionMetaData tm = getTransactionMetaData(tid);
2111:
2112: boolean rollbackScheduled = false;
2113: boolean validSavepoint = false;
2114:
2115: try {
2116: // Check that a savepoint with given name has been set
2117: if (!hasSavepoint(tid, savepointName))
2118: throw new SQLException(Translate.get(
2119: "transaction.savepoint.not.found",
2120: new String[] { savepointName,
2121: String.valueOf(transactionId) }));
2122:
2123: validSavepoint = true;
2124:
2125: // Wait for the scheduler to give us the authorization to execute
2126: scheduler.rollback(tm, savepointName, null);
2127: rollbackScheduled = true;
2128:
2129: if (logger.isDebugEnabled())
2130: logger.debug(Translate.get(
2131: "transaction.rollbacksavepoint",
2132: new String[] {
2133: String.valueOf(transactionId),
2134: savepointName }));
2135:
2136: // Send to loadbalancer
2137: loadBalancer.rollbackToSavepoint(tm, savepointName);
2138:
2139: // Update the recovery log
2140: if (recoveryLog != null)
2141: recoveryLog.logRequestCompletion(tm.getLogId(),
2142: true, 0);
2143:
2144: // Invalidate the query result cache if this transaction has updated the
2145: // cache or altered the schema
2146: if ((resultCache != null)
2147: && (tm.altersQueryResultCache() || tm
2148: .altersDatabaseSchema()))
2149: resultCache.rollback(transactionId);
2150:
2151: // Check for schema modifications that need to be rollbacked
2152: if (tm.altersDatabaseSchema()) {
2153: if (metadataCache != null)
2154: metadataCache.flushCache();
2155: setSchemaIsDirty(true);
2156: }
2157: } catch (SQLException e) {
2158: /*
2159: * Check if the we have to remove the entry from the total order queue
2160: * (wait to be sure that we do it in the proper order)
2161: */
2162: DistributedRollbackToSavepoint totalOrderRollbackToSavepoint = new DistributedRollbackToSavepoint(
2163: transactionId, savepointName);
2164: if (loadBalancer.waitForTotalOrder(
2165: totalOrderRollbackToSavepoint, false))
2166: loadBalancer
2167: .removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollbackToSavepoint);
2168:
2169: throw e;
2170: } catch (AllBackendsFailedException e) {
2171: String msg = Translate.get(
2172: "requestmanager.rollbackavepoint.failed.all",
2173: new String[] { String.valueOf(transactionId),
2174: savepointName, e.getMessage() });
2175: logger.error(msg);
2176: throw new SQLException(msg);
2177: } finally {
2178: if (rollbackScheduled) {
2179: // Notify scheduler for completion
2180: scheduler.savepointCompleted(transactionId);
2181: }
2182:
2183: if (validSavepoint)
2184: // Remove all the savepoints set after the savepoint we rollback to
2185: removeSavepoints(tid, savepointName);
2186: }
2187: } catch (RuntimeException e) {
2188: String msg = Translate
2189: .get("fatal.runtime.exception.requestmanager.rollbacksavepoint");
2190: logger.fatal(msg, e);
2191: endUserLogger.fatal(msg);
2192: throw new SQLException(e.getMessage());
2193: }
2194: }
2195:
2196: /**
2197: * Sets a unnamed savepoint to a transaction given its id.
2198: *
2199: * @param transactionId the transaction id
2200: * @return the generated id of the new savepoint
2201: * @throws SQLException if an error occurs
2202: */
2203: public int setSavepoint(long transactionId) throws SQLException {
2204: try {
2205: Long tid = new Long(transactionId);
2206: TransactionMetaData tm = getTransactionMetaData(tid);
2207: String savepointName = "unnamed savepoint";
2208: int savepointId;
2209:
2210: boolean setSavepointScheduled = false;
2211: try {
2212: // Wait for the scheduler to give us the authorization to execute
2213: savepointId = scheduler.setSavepoint(tm);
2214: setSavepointScheduled = true;
2215:
2216: savepointName = String.valueOf(savepointId);
2217:
2218: if (logger.isDebugEnabled())
2219: logger.debug(Translate.get(
2220: "transaction.setsavepoint", new String[] {
2221: savepointName,
2222: String.valueOf(transactionId) }));
2223:
2224: // Send to loadbalancer
2225: loadBalancer.setSavepoint(tm, savepointName);
2226:
2227: // Update the recovery log
2228: if (recoveryLog != null)
2229: recoveryLog.logRequestCompletion(tm.getLogId(),
2230: true, 0);
2231: } catch (AllBackendsFailedException e) {
2232: String msg = Translate.get(
2233: "requestmanager.setsavepoint.failed.all",
2234: new String[] { savepointName,
2235: String.valueOf(transactionId),
2236: e.getMessage() });
2237: logger.error(msg);
2238: throw new SQLException(msg);
2239: } catch (SQLException e) {
2240: throw e;
2241: } finally {
2242: if (setSavepointScheduled) {
2243: // Notify scheduler for completion
2244: scheduler.savepointCompleted(transactionId);
2245: }
2246: }
2247:
2248: // Add savepoint name to list of savepoints for this transaction
2249: addSavepoint(tid, savepointName);
2250: return savepointId;
2251: } catch (RuntimeException e) {
2252: String msg = Translate
2253: .get("fatal.runtime.exception.requestmanager.setsavepoint");
2254: logger.fatal(msg, e);
2255: endUserLogger.fatal(msg);
2256: throw new SQLException(e.getMessage());
2257: }
2258: }
2259:
2260: /**
2261: * Sets a savepoint given its desired name to a transaction given its id.
2262: *
2263: * @param transactionId the transaction id
2264: * @param name the desired name of the savepoint
2265: * @throws SQLException if an error occurs
2266: */
2267: public void setSavepoint(long transactionId, String name)
2268: throws SQLException {
2269: try {
2270: Long tid = new Long(transactionId);
2271: TransactionMetaData tm = getTransactionMetaData(tid);
2272:
2273: boolean setSavepointScheduled = false;
2274: try {
2275: // Wait for the scheduler to give us the authorization to execute
2276: scheduler.setSavepoint(tm, name, null);
2277: setSavepointScheduled = true;
2278:
2279: if (logger.isDebugEnabled())
2280: logger.debug(Translate.get(
2281: "transaction.setsavepoint",
2282: new String[] { name,
2283: String.valueOf(transactionId) }));
2284:
2285: // Send to loadbalancer
2286: loadBalancer.setSavepoint(tm, name);
2287:
2288: // Update the recovery log
2289: if (recoveryLog != null)
2290: recoveryLog.logRequestCompletion(tm.getLogId(),
2291: true, 0);
2292: } catch (AllBackendsFailedException e) {
2293: String msg = Translate.get(
2294: "requestmanager.setsavepoint.failed.all",
2295: new String[] { name,
2296: String.valueOf(transactionId),
2297: e.getMessage() });
2298: logger.error(msg);
2299: throw new SQLException(msg);
2300: } catch (SQLException e) {
2301: /*
2302: * Check if the we have to remove the entry from the total order queue
2303: * (wait to be sure that we do it in the proper order)
2304: */
2305: DistributedSetSavepoint totalOrderSavePoint = new DistributedSetSavepoint(
2306: tm.getLogin(), transactionId, name);
2307: if (loadBalancer.waitForTotalOrder(totalOrderSavePoint,
2308: false))
2309: loadBalancer
2310: .removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavePoint);
2311:
2312: throw e;
2313: } finally {
2314: if (setSavepointScheduled) {
2315: // Notify scheduler for completion
2316: scheduler.savepointCompleted(transactionId);
2317: }
2318: }
2319:
2320: // Add savepoint name to list of savepoints for this transaction
2321: addSavepoint(tid, name);
2322: } catch (RuntimeException e) {
2323: String msg = Translate
2324: .get("fatal.runtime.exception.requestmanager.setsavepoint");
2325: logger.fatal(msg, e);
2326: endUserLogger.fatal(msg);
2327: throw new SQLException(e.getMessage());
2328: }
2329: }
2330:
2331: /**
2332: * Releases a savepoint given its name from a transaction given its id.
2333: *
2334: * @param transactionId the transaction id
2335: * @param name the name of the savepoint
2336: * @exception SQLException if an error occurs
2337: */
2338: public void releaseSavepoint(long transactionId, String name)
2339: throws SQLException {
2340: try {
2341: Long tid = new Long(transactionId);
2342: TransactionMetaData tm = getTransactionMetaData(tid);
2343:
2344: boolean releasedSavepointScheduled = false;
2345: try {
2346: // Check that a savepoint with given name has been set
2347: if (!hasSavepoint(tid, name))
2348: throw new SQLException(Translate.get(
2349: "transaction.savepoint.not.found",
2350: new String[] { name,
2351: String.valueOf(transactionId) }));
2352:
2353: // Wait for the scheduler to give us the authorization to execute
2354: scheduler.releaseSavepoint(tm, name, null);
2355: releasedSavepointScheduled = true;
2356:
2357: if (logger.isDebugEnabled())
2358: logger.debug(Translate.get(
2359: "transaction.releasesavepoint",
2360: new String[] { name,
2361: String.valueOf(transactionId) }));
2362:
2363: // Send to loadbalancer
2364: loadBalancer.releaseSavepoint(tm, name);
2365:
2366: // Update the recovery log
2367: if (recoveryLog != null)
2368: recoveryLog.logRequestCompletion(tm.getLogId(),
2369: true, 0);
2370: } catch (SQLException e) {
2371: /*
2372: * Check if the we have to remove the entry from the total order queue
2373: * (wait to be sure that we do it in the proper order)
2374: */
2375: DistributedReleaseSavepoint totalOrderReleaseSavepoint = new DistributedReleaseSavepoint(
2376: transactionId, name);
2377: if (loadBalancer.waitForTotalOrder(
2378: totalOrderReleaseSavepoint, false))
2379: loadBalancer
2380: .removeObjectFromAndNotifyTotalOrderQueue(totalOrderReleaseSavepoint);
2381:
2382: throw e;
2383: } catch (AllBackendsFailedException e) {
2384: String msg = Translate.get(
2385: "requestmanager.releasesavepoint.failed.all",
2386: new String[] { name,
2387: String.valueOf(transactionId),
2388: e.getMessage() });
2389: logger.error(msg);
2390: throw new SQLException(msg);
2391: } finally {
2392: if (releasedSavepointScheduled) {
2393: // Notify scheduler for completion
2394: scheduler.savepointCompleted(transactionId);
2395: }
2396:
2397: // Remove savepoint for the transaction
2398: // Will log an error in case of an invalid savepoint
2399: removeSavepoint(tid, name);
2400: }
2401: } catch (RuntimeException e) {
2402: String msg = Translate
2403: .get("fatal.runtime.exception.requestmanager.releasesavepoint");
2404: logger.fatal(msg, e);
2405: endUserLogger.fatal(msg);
2406: throw new SQLException(e.getMessage());
2407: }
2408: }
2409:
2410: /**
2411: * Adds a given savepoint to a given transaction
2412: *
2413: * @param tid transaction id
2414: * @param savepointName name of the savepoint
2415: */
2416: public void addSavepoint(Long tid, String savepointName) {
2417: LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2418: if (savepoints == null) { // Lazy list creation
2419: savepoints = new LinkedList();
2420: tidSavepoints.put(tid, savepoints);
2421: }
2422:
2423: savepoints.addLast(savepointName);
2424: }
2425:
2426: /**
2427: * Removes a given savepoint for a given transaction
2428: *
2429: * @param tid transaction id
2430: * @param savepointName name of the savepoint
2431: */
2432: public void removeSavepoint(Long tid, String savepointName) {
2433: LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2434: if (savepoints == null)
2435: logger.error("No savepoints found for transaction " + tid);
2436: else
2437: savepoints.remove(savepointName);
2438: }
2439:
2440: /**
2441: * Removes all the savepoints set after a given savepoint for a given
2442: * transaction
2443: *
2444: * @param tid transaction id
2445: * @param savepointName name of the savepoint
2446: */
2447: public void removeSavepoints(Long tid, String savepointName) {
2448: LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2449: if (savepoints == null) {
2450: logger.error("No savepoints found for transaction " + tid);
2451: return;
2452: }
2453:
2454: int index = savepoints.indexOf(savepointName);
2455: if (index == -1)
2456: logger.error("No savepoint with name " + savepointName
2457: + " found " + "for transaction " + tid);
2458: else if (index < savepoints.size())
2459: savepoints.subList(index + 1, savepoints.size()).clear();
2460: }
2461:
2462: /**
2463: * Check if a given savepoint has been set for a given transaction
2464: *
2465: * @param tid transaction id
2466: * @param savepointName name of the savepoint
2467: * @return true if the savepoint exists
2468: */
2469: public boolean hasSavepoint(Long tid, String savepointName) {
2470: LinkedList savepoints = (LinkedList) tidSavepoints.get(tid);
2471: if (savepoints == null)
2472: return false;
2473:
2474: return savepoints.contains(savepointName);
2475: }
2476:
2477: //
2478: // Database Backends management
2479: //
2480:
2481: /**
2482: * Enable a backend that has been previously added to this virtual database
2483: * and that is in the disabled state.
2484: * <p>
2485: * The backend is enabled without further check.
2486: * <p>
2487: * The enableBackend method of the load balancer is called.
2488: *
2489: * @param db The database backend to enable
2490: * @throws SQLException if an error occurs
2491: */
2492: public void enableBackend(DatabaseBackend db) throws SQLException {
2493: db
2494: .setSchemaIsNeededByVdb(requiredParsingGranularity != ParsingGranularities.NO_PARSING);
2495: loadBalancer.enableBackend(db, true);
2496: logger.info(Translate
2497: .get("backend.state.enabled", db.getName()));
2498: }
2499:
2500: /**
2501: * The backend must have been previously added to this virtual database and be
2502: * in the disabled state.
2503: * <p>
2504: * All the queries since the given checkpoint are played and the backend state
2505: * is set to enabled when it is completely synchronized.
2506: * <p>
2507: * Note that the job is performed in background by a
2508: * <code>RecoverThread</code>. You can synchronize on thread termination if
2509: * you need to wait for completion of this task and listen to JMX
2510: * notifications for the success status.
2511: *
2512: * @param db The database backend to enable
2513: * @param checkpointName The checkpoint name to restart from
2514: * @return the JDBC reocver thread synchronizing the backend
2515: * @throws SQLException if an error occurs
2516: */
2517: public RecoverThread enableBackendFromCheckpoint(
2518: DatabaseBackend db, String checkpointName)
2519: throws SQLException {
2520: // Sanity checks
2521: if (recoveryLog == null) {
2522: String msg = Translate.get(
2523: "recovery.restore.checkpoint.failed.cause.null",
2524: checkpointName);
2525: logger.error(msg);
2526: throw new SQLException(msg);
2527: }
2528:
2529: if (db.getStateValue() != BackendState.DISABLED)
2530: throw new SQLException(Translate.get(
2531: "recovery.restore.backend.state.invalid", db
2532: .getName()));
2533:
2534: db
2535: .setSchemaIsNeededByVdb(requiredParsingGranularity != ParsingGranularities.NO_PARSING);
2536:
2537: RecoverThread recoverThread = new RecoverThread(scheduler,
2538: recoveryLog, db, this , checkpointName);
2539:
2540: // fire the thread and forget
2541: // exception will be reported in a jmx notification on the backend.
2542: recoverThread.start();
2543: return recoverThread;
2544: }
2545:
2546: /**
2547: * Disable a backend that is currently enabled on this virtual database.
2548: * <p>
2549: * The backend is disabled without further check.
2550: * <p>
2551: * The load balancer disabled method is called on the specified backend.
2552: *
2553: * @param db The database backend to disable
2554: * @param forceDisable true if disabling must be forced on the backend
2555: * @throws SQLException if an error occurs
2556: */
2557: public void disableBackend(DatabaseBackend db, boolean forceDisable)
2558: throws SQLException {
2559: if (db.isReadEnabled() || db.isWriteEnabled()) {
2560: loadBalancer.disableBackend(db, forceDisable);
2561: logger.info(Translate.get("backend.state.disabled", db
2562: .getName()));
2563: } else {
2564: throw new SQLException(Translate.get(
2565: "backend.already.disabled", db.getName()));
2566: }
2567: }
2568:
2569: /**
2570: * The backend must belong to this virtual database and be in the enabled
2571: * state. A checkpoint is stored with the given name and the backend is set to
2572: * the disabling state. The method then wait for all in-flight transactions to
2573: * complete before disabling completely the backend and storing its last known
2574: * checkpoint (upon success only).
2575: *
2576: * @param db The database backend to disable
2577: * @param checkpointName The checkpoint name to store
2578: * @throws SQLException if an error occurs
2579: */
2580: public void disableBackendWithCheckpoint(DatabaseBackend db,
2581: String checkpointName) throws SQLException {
2582: ArrayList backendsArrayList = new ArrayList();
2583: backendsArrayList.add(new BackendInfo(db));
2584: disableBackendsWithCheckpoint(backendsArrayList, checkpointName);
2585: }
2586:
2587: /**
2588: * Disable a list of backends. Only to store only one checkpoint, and to
2589: * disable all the backends at the same time so the the system is in a
2590: * coherent state. Consider only the backends that were enabled. The others
2591: * are left in the state they were before.
2592: *
2593: * @param backendInfos a List of BackendInfo to disable
2594: * @param checkpointName to store
2595: * @throws SQLException if an error occurs
2596: */
2597: public void disableBackendsWithCheckpoint(
2598: final ArrayList/* <BackendInfo> */backendInfos,
2599: String checkpointName) throws SQLException {
2600: // Sanity checks
2601: if (recoveryLog == null) {
2602: String msg = Translate.get(
2603: "recovery.store.checkpoint.failed.cause.null",
2604: checkpointName);
2605: logger.error(msg);
2606: throw new SQLException(msg);
2607: }
2608:
2609: // Wait for all pending writes to finish
2610: logger
2611: .info(Translate
2612: .get("requestmanager.wait.pending.writes"));
2613: scheduler.suspendNewWrites();
2614: scheduler.waitForSuspendedWritesToComplete();
2615:
2616: // Store checkpoint
2617: recoveryLog.storeCheckpoint(checkpointName);
2618: logger.info(Translate.get("recovery.checkpoint.stored",
2619: checkpointName));
2620:
2621: // Copy the list and consider only the backends that are enabled
2622: List backendsToDisable = new ArrayList();
2623: Iterator iter = backendInfos.iterator();
2624: while (iter.hasNext()) {
2625: BackendInfo backendInfo = (BackendInfo) iter.next();
2626: DatabaseBackend db;
2627: try {
2628: db = vdb.getAndCheckBackend(backendInfo.getName(),
2629: VirtualDatabase.NO_CHECK_BACKEND);
2630: if (db.isWriteEnabled()) {
2631: backendsToDisable.add(db);
2632: }
2633: } catch (VirtualDatabaseException e) {
2634: if (logger.isWarnEnabled()) {
2635: logger.warn(e.getMessage(), e);
2636: }
2637: }
2638: }
2639:
2640: // Signal all backends that they should not begin any new transaction
2641: int size = backendsToDisable.size();
2642: for (int i = 0; i < size; i++) {
2643: DatabaseBackend db = (DatabaseBackend) backendsToDisable
2644: .get(i);
2645: db.setState(BackendState.DISABLING);
2646: logger.info(Translate.get("backend.state.disabling", db
2647: .getName()));
2648: }
2649:
2650: // Resume writes
2651: logger.info(Translate
2652: .get("requestmanager.resume.pending.writes"));
2653: scheduler.resumeWrites();
2654:
2655: // Wait for all current transactions on backends to finish
2656: for (int i = 0; i < size; i++) {
2657: DatabaseBackend db = (DatabaseBackend) backendsToDisable
2658: .get(i);
2659: db
2660: .waitForAllTransactionsAndPersistentConnectionsToComplete();
2661: }
2662:
2663: // Now we can safely disable all backends
2664: for (int i = 0; i < size; i++) {
2665: DatabaseBackend db = (DatabaseBackend) backendsToDisable
2666: .get(i);
2667: db.setLastKnownCheckpoint(checkpointName);
2668: loadBalancer.disableBackend(db, true);
2669: logger.info(Translate.get("backend.state.disabled", db
2670: .getName()));
2671: }
2672: }
2673:
2674: /**
2675: * Create a backup from the content of a backend.
2676: *
2677: * @param backend the target backend to backup
2678: * @param login the login to use to connect to the database for the backup
2679: * operation
2680: * @param password the password to use to connect to the database for the
2681: * backup operation
2682: * @param dumpName the name of the dump to create
2683: * @param backuperName the logical name of the backuper to use
2684: * @param path the path where to store the dump
2685: * @param tables the list of tables to backup, null means all tables
2686: * @throws SQLException if the backup fails
2687: */
2688: public void backupBackend(DatabaseBackend backend, String login,
2689: String password, String dumpName, String backuperName,
2690: String path, ArrayList tables) throws SQLException {
2691: Backuper backuper = backupManager
2692: .getBackuperByName(backuperName);
2693: if (backuper == null)
2694: throw new SQLException("No backuper named " + backuperName
2695: + " was found.");
2696:
2697: boolean enableAfter = false;
2698: String checkpointName = vdb.buildCheckpointName("backup "
2699: + dumpName);
2700: if (backend.isReadEnabled() || backend.isWriteEnabled()) { // Disable backend and store checkpoint
2701: disableBackendWithCheckpoint(backend, checkpointName);
2702: logger.info(Translate.get("backend.state.disabled", backend
2703: .getName()));
2704: enableAfter = true;
2705: } else {
2706: if (backend.getLastKnownCheckpoint() == null) {
2707: throw new SQLException(Translate.get(
2708: "controller.backup.no.lastknown.checkpoint",
2709: backend.getName()));
2710: }
2711: }
2712: // else backend is already disabled, no checkpoint is stored here, it should
2713: // have been done at disable time.
2714:
2715: try {
2716: logger.info(Translate.get("controller.backup.start",
2717: backend.getName()));
2718:
2719: // Sanity check to be sure that no pending request is in the pipe for the
2720: // backend
2721: Vector pending = backend.getPendingRequests();
2722: if (pending.size() != 0) {
2723: if (logger.isWarnEnabled()) {
2724: int shortFormLength = this .getVirtualDatabase()
2725: .getSqlShortFormLength();
2726: for (int i = 0; i < pending.size(); i++)
2727: logger
2728: .warn("Pending:"
2729: + ((AbstractRequest) pending
2730: .get(i))
2731: .toStringShortForm(shortFormLength));
2732:
2733: logger.warn("Pending Requests:"
2734: + backend.getPendingRequests().size());
2735: logger.warn("Read enabled:"
2736: + backend.isReadEnabled());
2737: logger.warn("Write enabled:"
2738: + backend.isWriteEnabled());
2739: }
2740: throw new BackupException(Translate.get(
2741: "backend.not.ready.for.backup", pending.size()));
2742: }
2743:
2744: // Let's start the backup
2745: backend.setState(BackendState.BACKUPING);
2746: Date dumpDate = backuper.backup(backend, login, password,
2747: dumpName, path, tables);
2748: if (recoveryLog != null) {
2749: DumpInfo dump = new DumpInfo(dumpName, dumpDate, path,
2750: backuper.getDumpFormat(), backend
2751: .getLastKnownCheckpoint(), backend
2752: .getName(), "*");
2753: recoveryLog.storeDump(dump);
2754: }
2755:
2756: // Notify that a new dump is available
2757: backend
2758: .notifyJmx(SequoiaNotificationList.VIRTUALDATABASE_NEW_DUMP_LIST);
2759: } catch (BackupException be) {
2760: logger.error(Translate.get("controller.backup.failed"), be);
2761: throw new SQLException(be.getMessage());
2762: } catch (RuntimeException e) {
2763: logger.error(Translate.get("controller.backup.failed"), e);
2764: throw new SQLException(e.getMessage());
2765: } finally {
2766: // Switch from BACKUPING to DISABLED STATE
2767: backend.setState(BackendState.DISABLED);
2768: }
2769:
2770: logger.info(Translate.get("controller.backup.complete", backend
2771: .getName()));
2772:
2773: if (enableAfter) {
2774: RecoverThread thread = enableBackendFromCheckpoint(backend,
2775: checkpointName);
2776: try {
2777: thread.join();
2778: } catch (InterruptedException e) {
2779: logger.error("Recovery thread has been interrupted", e);
2780: }
2781: }
2782:
2783: }
2784:
2785: /**
2786: * Restore a dump on a specific backend. The proper Backuper is retrieved
2787: * automatically according to the dump format stored in the recovery log dump
2788: * table.
2789: * <p>
2790: * This method disables the backend and leave it disabled after recovery
2791: * process. The user has to call the <code>enableBackendFromCheckpoint</code>
2792: * after this.
2793: *
2794: * @param backend the backend to restore
2795: * @param login the login to use to connect to the database for the restore
2796: * operation
2797: * @param password the password to use to connect to the database for the
2798: * restore operation
2799: * @param dumpName the name of the dump to restore
2800: * @param tables the list of tables to restore, null means all tables
2801: * @throws BackupException if the restore operation failed
2802: */
2803: public void restoreBackendFromBackupCheckpoint(
2804: DatabaseBackend backend, String login, String password,
2805: String dumpName, ArrayList tables) throws BackupException {
2806: DumpInfo dumpInfo;
2807: try {
2808: dumpInfo = recoveryLog.getDumpInfo(dumpName);
2809: } catch (SQLException e) {
2810: throw new BackupException(
2811: "Recovery log error access occured while retrieving information for dump "
2812: + dumpName, e);
2813: }
2814: if (dumpInfo == null)
2815: throw new BackupException(
2816: "No information was found in the dump table for dump "
2817: + dumpName);
2818:
2819: Backuper backuper = backupManager.getBackuperByFormat(dumpInfo
2820: .getDumpFormat());
2821: if (backuper == null)
2822: throw new BackupException(
2823: "No backuper was found to handle dump format "
2824: + dumpInfo.getDumpFormat());
2825:
2826: if (backend.isReadEnabled())
2827: throw new BackupException(
2828: "Unable to restore a dump on an active backend. Disable the backend first.");
2829:
2830: try {
2831: backend.setState(BackendState.RESTORING);
2832:
2833: backuper.restore(backend, login, password, dumpName,
2834: dumpInfo.getDumpPath(), tables);
2835:
2836: // Set the checkpoint name corresponding to this database dump
2837: backend
2838: .setLastKnownCheckpoint(dumpInfo
2839: .getCheckpointName());
2840: backend.setState(BackendState.DISABLED);
2841: } catch (BackupException be) {
2842: backend.setState(BackendState.UNKNOWN);
2843: logger.error(Translate
2844: .get("controller.backup.recovery.failed"), be);
2845: throw be;
2846: } finally {
2847: logger.info(Translate.get(
2848: "controller.backup.recovery.done", backend
2849: .getName()));
2850: }
2851: }
2852:
2853: /**
2854: * Store all the backends checkpoint in the recoverylog
2855: *
2856: * @param databaseName the virtual database name
2857: * @param backends the <code>Arraylist</code> of backends
2858: */
2859: public void storeBackendsInfo(String databaseName,
2860: ArrayList backends) {
2861: if (recoveryLog == null)
2862: return;
2863: int size = backends.size();
2864: DatabaseBackend backend;
2865: for (int i = 0; i < size; i++) {
2866: backend = (DatabaseBackend) backends.get(i);
2867: try {
2868: recoveryLog.storeBackendRecoveryInfo(databaseName,
2869: new BackendRecoveryInfo(backend.getName(),
2870: backend.getLastKnownCheckpoint(),
2871: backend.getStateValue(), databaseName));
2872: } catch (SQLException e) {
2873: logger.error(Translate.get(
2874: "recovery.store.checkpoint.failed",
2875: new String[] { backend.getName(),
2876: e.getMessage() }), e);
2877: }
2878: }
2879: }
2880:
2881: /**
2882: * Remove a checkpoint and corresponding entries from the log table
2883: *
2884: * @param checkpointName to remove
2885: */
2886: public void removeCheckpoint(String checkpointName) {
2887: recoveryLog.removeCheckpoint(checkpointName);
2888: }
2889:
2890: //
2891: // Database schema management
2892: //
2893:
2894: /**
2895: * Get the <code>DatabaseSchema</code> used by this Request Manager.
2896: *
2897: * @return a <code>DatabaseSchema</code> value
2898: */
2899: public synchronized DatabaseSchema getDatabaseSchema() {
2900: try {
2901: // Refresh schema if needed. Note that this will break static schemas if
2902: // any
2903: if (schemaIsDirty) {
2904: if (logger.isDebugEnabled())
2905: logger
2906: .debug("Database schema is dirty, refreshing it");
2907: DatabaseSchema activeSchema = vdb
2908: .getDatabaseSchemaFromActiveBackends();
2909: if (activeSchema != null) {
2910: if (dbs == null)
2911: dbs = activeSchema;
2912: else
2913: dbs.mergeSchema(activeSchema);
2914: setSchemaIsDirty(false);
2915: }
2916: }
2917: } catch (SQLException e) {
2918: logger.error("Unable to refresh schema", e);
2919: }
2920: return dbs;
2921: }
2922:
2923: /**
2924: * Merge the given schema with the existing database schema.
2925: *
2926: * @param backendSchema The virtual database schema to merge.
2927: */
2928: public synchronized void mergeDatabaseSchema(
2929: DatabaseSchema backendSchema) {
2930: try {
2931: if (dbs == null)
2932: setDatabaseSchema(new DatabaseSchema(backendSchema),
2933: false);
2934: else {
2935: dbs.mergeSchema(backendSchema);
2936: logger
2937: .info(Translate
2938: .get("requestmanager.schema.virtualdatabase.merged.new"));
2939:
2940: if (schedulerParsingranularity != ParsingGranularities.NO_PARSING)
2941: scheduler.mergeDatabaseSchema(dbs);
2942:
2943: if (cacheParsingranularity != ParsingGranularities.NO_PARSING)
2944: resultCache.mergeDatabaseSchema(dbs);
2945: }
2946: } catch (SQLException e) {
2947: logger.error(Translate.get(
2948: "requestmanager.schema.merge.failed", e
2949: .getMessage()), e);
2950: }
2951: }
2952:
2953: /**
2954: * Sets the <code>DatabaseSchema</code> to be able to parse the requests and
2955: * find dependencies.
2956: *
2957: * @param schema a <code>DatabaseSchema</code> value
2958: * @param isStatic true if the given schema is static
2959: */
2960: public synchronized void setDatabaseSchema(DatabaseSchema schema,
2961: boolean isStatic) {
2962: if (schemaIsStatic) {
2963: if (isStatic) {
2964: logger
2965: .warn(Translate
2966: .get("requestmanager.schema.replace.static.with.new"));
2967: this .dbs = schema;
2968: } else
2969: logger
2970: .info(Translate
2971: .get("requestmanager.schema.ignore.new.dynamic"));
2972: } else {
2973: schemaIsStatic = isStatic;
2974: this .dbs = schema;
2975: logger
2976: .info(Translate
2977: .get("requestmanager.schema.set.new.virtualdatabase"));
2978: }
2979:
2980: if (schedulerParsingranularity != ParsingGranularities.NO_PARSING)
2981: scheduler.setDatabaseSchema(dbs);
2982:
2983: if (cacheParsingranularity != ParsingGranularities.NO_PARSING)
2984: resultCache.setDatabaseSchema(dbs);
2985:
2986: // Load balancers do not have a specific database schema to update
2987: }
2988:
2989: /**
2990: * Sets the schemaIsDirty value if the backend schema needs to be refreshed.
2991: *
2992: * @param schemaIsDirty The schemaIsDirty to set.
2993: */
2994: public synchronized void setSchemaIsDirty(boolean schemaIsDirty) {
2995: this .schemaIsDirty = schemaIsDirty;
2996: }
2997:
2998: //
2999: // Getter/Setter methods
3000: //
3001:
3002: /**
3003: * Returns the vdb value.
3004: *
3005: * @return Returns the vdb.
3006: */
3007: public VirtualDatabase getVirtualDatabase() {
3008: return vdb;
3009: }
3010:
3011: /**
3012: * Sets the backup manager for this recovery log
3013: *
3014: * @param currentBackupManager an instance of <code>BackupManager</code>
3015: */
3016: public void setBackupManager(BackupManager currentBackupManager) {
3017: this .backupManager = currentBackupManager;
3018: }
3019:
3020: /**
3021: * Returns the backupManager value.
3022: *
3023: * @return Returns the backupManager.
3024: */
3025: public BackupManager getBackupManager() {
3026: return backupManager;
3027: }
3028:
3029: /**
3030: * Get the Request Load Balancer used in this Request Controller.
3031: *
3032: * @return an <code>AbstractLoadBalancer</code> value
3033: */
3034: public AbstractLoadBalancer getLoadBalancer() {
3035: return loadBalancer;
3036: }
3037:
3038: /**
3039: * Set the Request Load Balancer to use in this Request Controller.
3040: *
3041: * @param loadBalancer a Request Load Balancer implementation
3042: */
3043: public void setLoadBalancer(AbstractLoadBalancer loadBalancer) {
3044: if (this .loadBalancer != null)
3045: throw new RuntimeException(
3046: "It is not possible to dynamically change a load balancer.");
3047: this .loadBalancer = loadBalancer;
3048: if (loadBalancer == null)
3049: return;
3050: loadBalancerParsingranularity = loadBalancer
3051: .getParsingGranularity();
3052: if (loadBalancerParsingranularity > requiredParsingGranularity)
3053: requiredParsingGranularity = loadBalancerParsingranularity;
3054:
3055: if (MBeanServerManager.isJmxEnabled()) {
3056: try {
3057: MBeanServerManager.registerMBean(
3058: new LoadBalancerControl(loadBalancer),
3059: JmxConstants.getLoadBalancerObjectName(vdb
3060: .getVirtualDatabaseName()));
3061: } catch (Exception e) {
3062: logger.error(Translate
3063: .get("jmx.failed.register.mbean.loadbalancer"));
3064: }
3065: }
3066:
3067: }
3068:
3069: /**
3070: * Get the result cache (if any) used in this Request Manager.
3071: *
3072: * @return an <code>AbstractResultCache</code> value or null if no Reqsult
3073: * Cache has been defined
3074: */
3075: public AbstractResultCache getResultCache() {
3076: return resultCache;
3077: }
3078:
3079: /**
3080: * Returns the metadataCache value.
3081: *
3082: * @return Returns the metadataCache.
3083: */
3084: public MetadataCache getMetadataCache() {
3085: return metadataCache;
3086: }
3087:
3088: /**
3089: * Sets the metadataCache value.
3090: *
3091: * @param metadataCache The metadataCache to set.
3092: */
3093: public void setMetadataCache(MetadataCache metadataCache) {
3094: this .metadataCache = metadataCache;
3095: }
3096:
3097: /**
3098: * Sets the ParsingCache.
3099: *
3100: * @param parsingCache The parsingCache to set.
3101: */
3102: public void setParsingCache(ParsingCache parsingCache) {
3103: parsingCache.setRequestManager(this );
3104: parsingCache.setGranularity(requiredParsingGranularity);
3105: parsingCache.setCaseSensitiveParsing(isCaseSensitiveParsing);
3106: this .parsingCache = parsingCache;
3107: }
3108:
3109: /**
3110: * Returns the Recovery Log Manager.
3111: *
3112: * @return RecoveryLog the current recovery log (null if none)
3113: */
3114: public RecoveryLog getRecoveryLog() {
3115: return recoveryLog;
3116: }
3117:
3118: /**
3119: * Sets the Recovery Log Manager.
3120: *
3121: * @param recoveryLog The log recovery to set
3122: */
3123: public void setRecoveryLog(RecoveryLog recoveryLog) {
3124: if (recoveryLog == null)
3125: return;
3126: this .recoveryLog = recoveryLog;
3127: loadBalancer.setRecoveryLog(recoveryLog);
3128: ArrayList backends = vdb.getBackends();
3129: int size = backends.size();
3130: backendStateListener = new BackendStateListener(vdb
3131: .getVirtualDatabaseName(), recoveryLog);
3132: for (int i = 0; i < size; i++)
3133: ((DatabaseBackend) backends.get(i))
3134: .setStateListener(backendStateListener);
3135: }
3136:
3137: /**
3138: * Set the Request Cache to use in this Request Controller.
3139: *
3140: * @param cache a Request Cache implementation
3141: */
3142: public void setResultCache(AbstractResultCache cache) {
3143: resultCache = cache;
3144: cacheParsingranularity = cache.getParsingGranularity();
3145: if (cacheParsingranularity > requiredParsingGranularity)
3146: requiredParsingGranularity = cacheParsingranularity;
3147: }
3148:
3149: /**
3150: * Get the Request Scheduler (if any) used in this Request Controller.
3151: *
3152: * @return an <code>AbstractScheduler</code> value or null if no Request
3153: * Scheduler has been defined
3154: */
3155: public AbstractScheduler getScheduler() {
3156: return scheduler;
3157: }
3158:
3159: /**
3160: * Set the Request Scheduler to use in this Request Controller.
3161: *
3162: * @param scheduler a Request Scheduler implementation
3163: */
3164: public void setScheduler(AbstractScheduler scheduler) {
3165: this .scheduler = scheduler;
3166: schedulerParsingranularity = scheduler.getParsingGranularity();
3167: if (schedulerParsingranularity > requiredParsingGranularity)
3168: requiredParsingGranularity = schedulerParsingranularity;
3169: }
3170:
3171: /**
3172: * Sets the parsing case sensitivity. If true the request are parsed in a case
3173: * sensitive way (table/column name must match exactly the case of the names
3174: * fetched from the database or enforced by a static schema).
3175: *
3176: * @param isCaseSensitiveParsing true if parsing is case sensitive
3177: */
3178: public void setCaseSensitiveParsing(boolean isCaseSensitiveParsing) {
3179: this .isCaseSensitiveParsing = isCaseSensitiveParsing;
3180: if (parsingCache != null)
3181: parsingCache
3182: .setCaseSensitiveParsing(isCaseSensitiveParsing);
3183: }
3184:
3185: //
3186: // Debug/Monitoring
3187: //
3188:
3189: /**
3190: * Get xml information about this Request Manager
3191: *
3192: * @return <code>String</code> in xml formatted text
3193: */
3194: public String getXml() {
3195: StringBuffer info = new StringBuffer();
3196: info.append("<" + DatabasesXmlTags.ELT_RequestManager + " "
3197: + DatabasesXmlTags.ATT_caseSensitiveParsing + "=\""
3198: + isCaseSensitiveParsing + "\" "
3199: + DatabasesXmlTags.ATT_beginTimeout + "=\""
3200: + beginTimeout / 1000 + "\" "
3201: + DatabasesXmlTags.ATT_commitTimeout + "=\""
3202: + commitTimeout / 1000 + "\" "
3203: + DatabasesXmlTags.ATT_rollbackTimeout + "=\""
3204: + rollbackTimeout / 1000 + "\">");
3205: if (scheduler != null)
3206: info.append(scheduler.getXml());
3207:
3208: if (metadataCache != null || parsingCache != null
3209: || resultCache != null) {
3210: info.append("<" + DatabasesXmlTags.ELT_RequestCache + ">");
3211: if (metadataCache != null)
3212: info.append(metadataCache.getXml());
3213: if (parsingCache != null)
3214: info.append(parsingCache.getXml());
3215: if (resultCache != null)
3216: info.append(resultCache.getXml());
3217: info.append("</" + DatabasesXmlTags.ELT_RequestCache + ">");
3218: }
3219:
3220: if (loadBalancer != null)
3221: info.append(loadBalancer.getXml());
3222: if (recoveryLog != null)
3223: info.append(this .recoveryLog.getXml());
3224: info.append("</" + DatabasesXmlTags.ELT_RequestManager + ">");
3225: return info.toString();
3226: }
3227:
3228: /**
3229: * Returns the backendStateListener value.
3230: *
3231: * @return Returns the backendStateListener.
3232: */
3233: public BackendStateListener getBackendStateListener() {
3234: return backendStateListener;
3235: }
3236:
3237: /**
3238: * Returns the beginTimeout value.
3239: *
3240: * @return Returns the beginTimeout.
3241: */
3242: public long getBeginTimeout() {
3243: return beginTimeout;
3244: }
3245:
3246: /**
3247: * Returns the cacheParsingranularity value.
3248: *
3249: * @return Returns the cacheParsingranularity.
3250: */
3251: public int getCacheParsingranularity() {
3252: return cacheParsingranularity;
3253: }
3254:
3255: /**
3256: * Sets the cacheParsingranularity value.
3257: *
3258: * @param cacheParsingranularity The cacheParsingranularity to set.
3259: */
3260: public void setCacheParsingranularity(int cacheParsingranularity) {
3261: this .cacheParsingranularity = cacheParsingranularity;
3262: }
3263:
3264: /**
3265: * Returns the commitTimeout value.
3266: *
3267: * @return Returns the commitTimeout.
3268: */
3269: public long getCommitTimeout() {
3270: return commitTimeout;
3271: }
3272:
3273: /**
3274: * Returns the number of savepoints that are defined for a given transaction
3275: *
3276: * @param tId the transaction id
3277: * @return the number of savepoints that are defined in the transaction whose
3278: * id is tId
3279: */
3280: public int getNumberOfSavepointsInTransaction(long tId) {
3281: LinkedList savepoints = (LinkedList) tidSavepoints
3282: .get(new Long(tId));
3283: if (savepoints == null) {
3284: return 0;
3285: } else
3286: return savepoints.size();
3287: }
3288:
3289: /**
3290: * Returns the requiredParsingGranularity value.
3291: *
3292: * @return Returns the requiredParsingGranularity.
3293: */
3294: public int getRequiredParsingGranularity() {
3295: return requiredParsingGranularity;
3296: }
3297:
3298: /**
3299: * Returns the rollbackTimeout value.
3300: *
3301: * @return Returns the rollbackTimeout.
3302: */
3303: public long getRollbackTimeout() {
3304: return rollbackTimeout;
3305: }
3306:
3307: /**
3308: * Returns the schedulerParsingranularity value.
3309: *
3310: * @return Returns the schedulerParsingranularity.
3311: */
3312: public int getSchedulerParsingranularity() {
3313: return schedulerParsingranularity;
3314: }
3315:
3316: /**
3317: * Sets the schedulerParsingranularity value.
3318: *
3319: * @param schedulerParsingranularity The schedulerParsingranularity to set.
3320: */
3321: public void setSchedulerParsingranularity(
3322: int schedulerParsingranularity) {
3323: this .schedulerParsingranularity = schedulerParsingranularity;
3324: }
3325:
3326: /**
3327: * Returns the schemaIsStatic value.
3328: *
3329: * @return Returns the schemaIsStatic.
3330: */
3331: public boolean isStaticSchema() {
3332: return schemaIsStatic;
3333: }
3334:
3335: /**
3336: * Sets the schemaIsStatic value.
3337: *
3338: * @param schemaIsStatic The schemaIsStatic to set.
3339: */
3340: public void setStaticSchema(boolean schemaIsStatic) {
3341: this .schemaIsStatic = schemaIsStatic;
3342: }
3343:
3344: /**
3345: * Returns the isCaseSensitiveParsing value.
3346: *
3347: * @return Returns the isCaseSensitiveParsing.
3348: */
3349: public boolean isCaseSensitiveParsing() {
3350: return isCaseSensitiveParsing;
3351: }
3352:
3353: /**
3354: * @see org.continuent.sequoia.controller.jmx.AbstractStandardMBean#getAssociatedString()
3355: */
3356: public String getAssociatedString() {
3357: return "requestmanager";
3358: }
3359:
3360: /**
3361: * Resume all transactions, writes and persistent connections.
3362: */
3363: public void resumeActivity() {
3364: scheduler.resumeWrites();
3365: scheduler.resumeNewTransactions();
3366: scheduler.resumeOpenClosePersistentConnection();
3367: }
3368:
3369: /**
3370: * Suspend all transactions, writes and persistent connections.
3371: *
3372: * @throws SQLException if an error occured while waiting for writes or
3373: * transactions to complete
3374: */
3375: public void suspendActivity() throws SQLException {
3376: // Suspend opening and closing of persistent connections
3377: scheduler.suspendOpenClosePersistentConnection();
3378:
3379: // Suspend new transactions
3380: scheduler.suspendNewTransactions();
3381:
3382: // Suspend new writes
3383: scheduler.suspendNewWrites();
3384:
3385: // Wait for suspended activity to complete
3386: scheduler.waitForSuspendedTransactionsToComplete();
3387: scheduler.waitForSuspendedWritesToComplete();
3388: scheduler.waitForPendingOpenClosePersistentConnection();
3389: }
3390:
3391: /**
3392: * {@inheritDoc}
3393: *
3394: * @see org.continuent.sequoia.common.jmx.mbeans.RequestManagerMBean#parseSqlRequest(java.lang.String,
3395: * java.lang.String)
3396: */
3397: public String parseSqlRequest(String sqlRequest,
3398: String lineSeparator) {
3399: RequestFactory requestFactory = ControllerConstants.CONTROLLER_FACTORY
3400: .getRequestFactory();
3401: AbstractRequest request = requestFactory.requestFromString(
3402: sqlRequest, false, false, 0, lineSeparator);
3403: try {
3404: request.parse(getDatabaseSchema(),
3405: ParsingGranularities.TABLE, isCaseSensitiveParsing);
3406: } catch (SQLException ignored) {
3407: }
3408: return request.getParsingResultsAsString();
3409: }
3410: }
|