0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Julie Marguerite, Greg Ward, Jess Sightler, Jean-Bernard van Zuylen, Charles Cordingley.
0023: */package org.continuent.sequoia.controller.recoverylog;
0024:
0025: import java.sql.Connection;
0026: import java.sql.DatabaseMetaData;
0027: import java.sql.PreparedStatement;
0028: import java.sql.ResultSet;
0029: import java.sql.SQLException;
0030: import java.sql.Statement;
0031: import java.sql.Timestamp;
0032: import java.util.ArrayList;
0033: import java.util.Iterator;
0034: import java.util.List;
0035: import java.util.Map;
0036: import java.util.Timer;
0037: import java.util.TimerTask;
0038: import java.util.TreeMap;
0039:
0040: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0041: import org.continuent.sequoia.common.i18n.Translate;
0042: import org.continuent.sequoia.common.jmx.management.BackendState;
0043: import org.continuent.sequoia.common.jmx.management.DumpInfo;
0044: import org.continuent.sequoia.common.log.Trace;
0045: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0046: import org.continuent.sequoia.common.xml.XmlComponent;
0047: import org.continuent.sequoia.controller.connection.DriverManager;
0048: import org.continuent.sequoia.controller.core.ControllerConstants;
0049: import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
0050: import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
0051: import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
0052: import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
0053: import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
0054: import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
0055: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
0056: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
0057: import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
0058: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteQueryTask;
0059: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteTask;
0060: import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
0061: import org.continuent.sequoia.controller.recoverylog.events.DeleteLogEntriesAndCheckpointBetweenEvent;
0062: import org.continuent.sequoia.controller.recoverylog.events.FindClosePersistentConnectionEvent;
0063: import org.continuent.sequoia.controller.recoverylog.events.FindCommitEvent;
0064: import org.continuent.sequoia.controller.recoverylog.events.FindLastIdEvent;
0065: import org.continuent.sequoia.controller.recoverylog.events.FindRollbackEvent;
0066: import org.continuent.sequoia.controller.recoverylog.events.GetCheckpointLogEntryEvent;
0067: import org.continuent.sequoia.controller.recoverylog.events.GetCheckpointLogIdEvent;
0068: import org.continuent.sequoia.controller.recoverylog.events.GetNumberOfLogEntriesEvent;
0069: import org.continuent.sequoia.controller.recoverylog.events.GetUpdateCountEvent;
0070: import org.continuent.sequoia.controller.recoverylog.events.LogCommitEvent;
0071: import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
0072: import org.continuent.sequoia.controller.recoverylog.events.LogRequestCompletionEvent;
0073: import org.continuent.sequoia.controller.recoverylog.events.LogRequestEvent;
0074: import org.continuent.sequoia.controller.recoverylog.events.LogRollbackEvent;
0075: import org.continuent.sequoia.controller.recoverylog.events.RemoveCheckpointEvent;
0076: import org.continuent.sequoia.controller.recoverylog.events.ResetLogEvent;
0077: import org.continuent.sequoia.controller.recoverylog.events.ShiftLogEntriesEvent;
0078: import org.continuent.sequoia.controller.recoverylog.events.ShutdownLogEvent;
0079: import org.continuent.sequoia.controller.recoverylog.events.StoreCheckpointWithLogIdEvent;
0080: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
0081: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0082: import org.continuent.sequoia.controller.requests.AbstractRequest;
0083: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0084: import org.continuent.sequoia.controller.requests.RequestFactory;
0085: import org.continuent.sequoia.controller.requests.SelectRequest;
0086: import org.continuent.sequoia.controller.requests.StoredProcedure;
0087: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0088: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0089:
0090: /**
0091: * Recovery Log using a database accessed through JDBC.
0092: *
0093: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0094: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
0095: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>*
0096: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0097: * </a>
0098: * @version 1.0
0099: */
0100: public class RecoveryLog implements XmlComponent {
0101: static final String BEGIN = "begin";
0102: private static final String COMMIT = "commit";
0103: private static final String ROLLBACK = "rollback";
0104: private static final String CLOSE_PERSISTENT_CONNECTION = "close";
0105: private static final String OPEN_PERSISTENT_CONNECTION = "open";
0106: /**
0107: * Unkown user used for transaction abort when the transactional context
0108: * cannot be retrieved
0109: */
0110: public static final String UNKNOWN_USER = "_u u_";
0111:
0112: /** Index of the log id column in the log table */
0113: public static final int COLUMN_INDEX_LOG_ID = 1;
0114: /** Index of the vlogin column in the log table */
0115: public static final int COLUMN_INDEX_VLOGIN = 2;
0116: /** Index of the sql column in the log table */
0117: public static final int COLUMN_INDEX_SQL = 3;
0118: /** Index of the sql_param column in the log table */
0119: public static final int COLUMN_INDEX_SQL_PARAMS = 4;
0120: /** Index of the auto_conn_tran column in the log table */
0121: public static final int COLUMN_INDEX_AUTO_CONN_TRAN = 5;
0122: /** Index of the transaction id column in the log table */
0123: public static final int COLUMN_INDEX_TRANSACTION_ID = 6;
0124: /** Index of the request id column in the log table */
0125: public static final int COLUMN_INDEX_REQUEST_ID = 7;
0126: /** Index of the exec_status column in the log table */
0127: public static final int COLUMN_INDEX_EXEC_STATUS = 8;
0128: /** Index of the exec_time column in the log table */
0129: public static final int COLUMN_INDEX_EXEC_TIME = 9;
0130: /** Index of the update_count column in the log table */
0131: public static final int COLUMN_INDEX_UPDATE_COUNT = 10;
0132:
0133: static Trace logger = Trace
0134: .getLogger("org.continuent.sequoia.controller.recoverylog");
0135: static Trace endUserLogger = Trace
0136: .getLogger("org.continuent.sequoia.enduser");
0137:
0138: private RequestFactory requestFactory = ControllerConstants.CONTROLLER_FACTORY
0139: .getRequestFactory();
0140:
0141: /** Number of backends currently recovering */
0142: private long recoveringNb = 0;
0143:
0144: /** Size of the pendingRecoveryTasks queue used by the recover thread */
0145: protected int recoveryBatchSize;
0146:
0147: /** Driver class name. */
0148: private String driverClassName;
0149:
0150: /** Driver name. */
0151: private String driverName;
0152:
0153: /** Driver URL. */
0154: private String url;
0155:
0156: /** User's login. */
0157: private String login;
0158:
0159: /** User's password. */
0160: private String password;
0161:
0162: /** Database connection */
0163: private Connection internalConnectionManagedByGetDatabaseConnection = null;
0164:
0165: /**
0166: * Recovery log table options.
0167: *
0168: * @see #setLogTableCreateStatement(String, String, String, String, String,
0169: * String, String, String, String, String, String, String)
0170: */
0171: private String logTableCreateTable;
0172: private String logTableName;
0173: private String logTableCreateStatement;
0174: private String logTableLogIdType;
0175: private String logTableVloginType;
0176: private String logTableSqlColumnName;
0177: private String logTableSqlType;
0178: private String logTableAutoConnTranColumnType;
0179: private String logTableTransactionIdType;
0180: private String logTableRequestIdType;
0181: private String logTableExecTimeType;
0182: private String logTableUpdateCountType;
0183: private String logTableExtraStatement;
0184:
0185: /**
0186: * Checkpoint table options.
0187: *
0188: * @see #setCheckpointTableCreateStatement(String, String, String, String,
0189: * String)
0190: */
0191: private String checkpointTableCreateTable;
0192: private String checkpointTableName;
0193: private String checkpointTableCreateStatement;
0194: private String checkpointTableNameType;
0195: private String checkpointTableLogIdType;
0196: private String checkpointTableExtraStatement;
0197:
0198: /**
0199: * Backend table options
0200: *
0201: * @see #setBackendTableCreateStatement(String, String, String, String,
0202: * String, String, String)
0203: */
0204: private String backendTableCreateStatement;
0205: private String backendTableName;
0206: private String backendTableCreateTable;
0207: private String backendTableDatabaseName;
0208: private String backendTableExtraStatement;
0209: private String backendTableCheckpointName;
0210: private String backendTableBackendState;
0211: private String backendTableBackendName;
0212:
0213: /**
0214: * Dump table options
0215: *
0216: * @see #setDumpTableCreateStatement(String, String, String, String, String,
0217: * String, String, String, String, String, String)
0218: */
0219: private String dumpTableCreateStatement;
0220: private String dumpTableCreateTable;
0221: private String dumpTableName;
0222: private String dumpTableDumpNameColumnType;
0223: private String dumpTableDumpDateColumnType;
0224: private String dumpTableDumpPathColumnType;
0225: private String dumpTableDumpFormatColumnType;
0226: private String dumpTableCheckpointNameColumnType;
0227: private String dumpTableBackendNameColumnType;
0228: private String dumpTableTablesColumnName;
0229: private String dumpTableTablesColumnType;
0230: private String dumpTableExtraStatementDefinition;
0231:
0232: /** Current maximum value of the primary key in logTableName. */
0233: private long logTableId = 0;
0234:
0235: /** Timeout for SQL requests. */
0236: private int timeout;
0237:
0238: private LoggerThread loggerThread;
0239:
0240: private boolean isShuttingDown = false;
0241:
0242: /** timer used to auto-close the jdbc connection */
0243: private Timer timer = new Timer();
0244: private int autocloseTimeout;
0245:
0246: /** optional connection-checking feature */
0247: private boolean checkConnectionValidity;
0248:
0249: /**
0250: * Creates a new <code>RecoveryLog</code> instance.
0251: *
0252: * @param driverName the driverClassName name.
0253: * @param driverClassName the driverClassName class name.
0254: * @param url the JDBC URL.
0255: * @param login the login to use to connect to the database.
0256: * @param password the password to connect to the database.
0257: * @param requestTimeout timeout in seconds for update queries.
0258: * @param recoveryBatchSize number of queries that can be accumulated into a
0259: * batch when recovering
0260: */
0261: public RecoveryLog(String driverName, String driverClassName,
0262: String url, String login, String password,
0263: int requestTimeout, int recoveryBatchSize) {
0264: this .driverName = driverName;
0265: this .driverClassName = driverClassName;
0266: this .url = url;
0267: this .login = login;
0268: this .password = password;
0269: this .timeout = requestTimeout;
0270: if (recoveryBatchSize < 1) {
0271: logger
0272: .warn("RecoveryBatchSize was set to a value lesser than 1, resetting value to 1.");
0273: recoveryBatchSize = 1;
0274: }
0275: this .recoveryBatchSize = recoveryBatchSize;
0276:
0277: if (url.startsWith("jdbc:hsqldb:file:")) {
0278: if (url.indexOf("shutdown=true") == -1) {
0279: this .url = url + ";shutdown=true";
0280: String msg = "Hsqldb RecoveryLog url has no shutdown=true option.\n"
0281: + "This prevents the recovery log from shutting down correctly.\n"
0282: + "Please update your vdb.xml file.\n"
0283: + "Setting url to '" + this .url + "'.";
0284: logger.warn(msg);
0285: endUserLogger.warn(msg);
0286: }
0287: }
0288:
0289: // Connect to the database
0290: try {
0291: getDatabaseConnection();
0292: } catch (SQLException e) {
0293: throw new RuntimeException(
0294: "Unable to connect to the database: " + e);
0295: }
0296:
0297: // Logger thread will be created in checkRecoveryLogTables()
0298: // after database has been initialized
0299: }
0300:
0301: //
0302: // Database manipulation and access
0303: //
0304:
0305: /**
0306: * used by vdb.xml parser (element Recoverylog) to enable and set optional
0307: * autoclosing mechanism. Default value is 0 (no auto-close).
0308: */
0309: public void setAutoCloseTimeout(int seconds) {
0310: autocloseTimeout = seconds;
0311: }
0312:
0313: public void setCheckConnectionValidity() {
0314: checkConnectionValidity = true;
0315: }
0316:
0317: /**
0318: * Gets a connection to the database.
0319: *
0320: * @return a connection to the database
0321: * @exception SQLException if an error occurs.
0322: * @see #invalidateInternalConnection()
0323: */
0324: protected synchronized Connection getDatabaseConnection()
0325: throws SQLException {
0326: try {
0327: if (internalConnectionManagedByGetDatabaseConnection == null) {
0328: if (logger.isDebugEnabled())
0329: logger.debug(Translate.get("recovery.jdbc.connect",
0330: new String[] { url, login }));
0331: internalConnectionManagedByGetDatabaseConnection = DriverManager
0332: .getConnection(url, login, password,
0333: driverName, driverClassName);
0334: } else {
0335: if (checkConnectionValidity)
0336: checkDatabaseConnection();
0337: }
0338:
0339: if (autocloseTimeout > 0)
0340: scheduleCloserTask();
0341:
0342: return internalConnectionManagedByGetDatabaseConnection;
0343: } catch (RuntimeException e) {
0344: String msg = Translate.get("recovery.jdbc.connect.failed",
0345: e);
0346: if (logger.isDebugEnabled())
0347: logger.debug(msg, e);
0348: throw new SQLException(msg);
0349: } catch (SQLException e) {
0350: invalidateInternalConnection();
0351: String msg = Translate.get("recovery.jdbc.connect.failed",
0352: e);
0353: if (logger.isDebugEnabled())
0354: logger.debug(msg, e);
0355: throw new SQLException(msg);
0356: }
0357: }
0358:
0359: /**
0360: * Checks that the connection to the recoverylog database is still valid and
0361: * if it is not attempts one reconnect.
0362: *
0363: * @throws SQLException if an error happens at re-connection
0364: */
0365: void checkDatabaseConnection() throws SQLException {
0366: try {
0367: internalConnectionManagedByGetDatabaseConnection
0368: .getTransactionIsolation();
0369: } catch (SQLException e) {
0370: internalConnectionManagedByGetDatabaseConnection = DriverManager
0371: .getConnection(url, login, password, driverName,
0372: driverClassName);
0373: }
0374: }
0375:
0376: void scheduleCloserTask() {
0377: timer.cancel();
0378: timer = new Timer();
0379: timer.schedule(new TimerTask() {
0380: public void run() {
0381: invalidateInternalConnection();
0382: }
0383: }, autocloseTimeout * 1000);
0384: }
0385:
0386: /**
0387: * Increments the value of logTableId.
0388: */
0389: synchronized long incrementLogTableId() {
0390: logTableId++;
0391: return logTableId;
0392: }
0393:
0394: /**
0395: * Checks if the tables (log and checkpoint) already exist, and create them if
0396: * needed.
0397: */
0398: private void intializeDatabase() throws SQLException {
0399: boolean createLogTable = true;
0400: boolean createCheckpointTable = true;
0401: boolean createBackendTable = true;
0402: boolean createDumpTable = true;
0403: Connection connection;
0404: // Check if tables exist
0405: try {
0406: connection = getDatabaseConnection();
0407:
0408: if (connection == null)
0409: throw new SQLException(Translate.get(
0410: "recovery.jdbc.connect.failed",
0411: "null connection returned by DriverManager"));
0412:
0413: connection.setAutoCommit(false);
0414: // Get DatabaseMetaData
0415: DatabaseMetaData metaData = connection.getMetaData();
0416:
0417: // Get a description of tables matching the catalog, schema, table name
0418: // and type.
0419: // Sending in null for catalog and schema drop them from the selection
0420: // criteria. Replace the last argument in the getTables method with
0421: // types below to obtain only database tables. (Sending in null
0422: // retrieves all types).
0423: String[] types = { "TABLE", "VIEW" };
0424: ResultSet rs = metaData.getTables(null, null, "%", types);
0425:
0426: // Get tables metadata
0427: String tableName;
0428: while (rs.next()) {
0429: // 1 is table catalog, 2 is table schema, 3 is table name, 4 is type
0430: tableName = rs.getString(3);
0431: if (logger.isDebugEnabled())
0432: logger.debug(Translate.get(
0433: "recovery.jdbc.table.found", tableName));
0434: if (tableName.equalsIgnoreCase(logTableName)) {
0435: if (tableName.compareTo(logTableName) != 0)
0436: logger
0437: .warn(Translate
0438: .get(
0439: "recovery.jdbc.logtable.case.mismatch",
0440: new String[] {
0441: logTableName,
0442: tableName }));
0443: createLogTable = false;
0444: // initialize logTableId
0445: PreparedStatement p = null;
0446: try {
0447: ResultSet result = null;
0448: p = connection
0449: .prepareStatement("SELECT MAX(log_id) AS max_log_id FROM "
0450: + logTableName);
0451: result = p.executeQuery();
0452: if (result.next())
0453: logTableId = result.getLong(1);
0454: else
0455: logTableId = 0;
0456: p.close();
0457: } catch (SQLException e) {
0458: try {
0459: if (p != null)
0460: p.close();
0461: } catch (Exception ignore) {
0462: }
0463: throw new RuntimeException(
0464: Translate
0465: .get(
0466: "recovery.jdbc.logtable.getvalue.failed",
0467: e));
0468: }
0469:
0470: }
0471: if (tableName.equalsIgnoreCase(checkpointTableName)) {
0472: if (tableName.compareTo(checkpointTableName) != 0)
0473: logger
0474: .warn(Translate
0475: .get(
0476: "recovery.jdbc.checkpointtable.case.mismatch",
0477: new String[] {
0478: checkpointTableName,
0479: tableName }));
0480: createCheckpointTable = false;
0481: if (logger.isDebugEnabled()) {
0482: // Dump checkpoints.
0483: StringBuffer sb = new StringBuffer();
0484: sb.append("Checkpoint list...");
0485:
0486: Map checkpoints = this .getCheckpoints();
0487: Iterator checkPointIterator = checkpoints
0488: .keySet().iterator();
0489: while (checkPointIterator.hasNext()) {
0490: String name = (String) checkPointIterator
0491: .next();
0492: String logId = (String) checkpoints
0493: .get(name);
0494: sb.append("\n");
0495: sb.append("name=[").append(name).append(
0496: "] log_id=[").append(logId).append(
0497: "]");
0498: }
0499: logger.debug(sb.toString());
0500: }
0501: } else if (tableName.equalsIgnoreCase(backendTableName)) {
0502: if (tableName.compareTo(backendTableName) != 0)
0503: logger
0504: .warn(Translate
0505: .get(
0506: "recovery.jdbc.backendtable.case.mismatch",
0507: new String[] {
0508: backendTableName,
0509: tableName }));
0510: createBackendTable = false;
0511: } else if (tableName.equalsIgnoreCase(dumpTableName)) {
0512: if (tableName.compareTo(dumpTableName) != 0)
0513: logger
0514: .warn(Translate
0515: .get(
0516: "recovery.jdbc.dumptable.case.mismatch",
0517: new String[] {
0518: backendTableName,
0519: tableName }));
0520: createDumpTable = false;
0521: }
0522: }
0523: try {
0524: connection.commit();
0525: connection.setAutoCommit(true);
0526: } catch (Exception ignore) {
0527: // Read-only transaction we don't care
0528: }
0529: } catch (SQLException e) {
0530: logger.error(Translate
0531: .get("recovery.jdbc.table.no.description"), e);
0532: throw e;
0533: }
0534:
0535: // Create the missing tables
0536: Statement stmt = null;
0537: if (createLogTable) {
0538: if (logger.isInfoEnabled())
0539: logger.info(Translate.get(
0540: "recovery.jdbc.logtable.create", logTableName));
0541: try {
0542: stmt = connection.createStatement();
0543: stmt.executeUpdate(logTableCreateStatement);
0544: stmt.close();
0545: } catch (SQLException e) {
0546: throw new SQLException(Translate.get(
0547: "recovery.jdbc.logtable.create.failed",
0548: new String[] { logTableName, e.getMessage() }));
0549: }
0550: }
0551: if (createCheckpointTable) {
0552: if (logger.isInfoEnabled())
0553: logger.info(Translate.get(
0554: "recovery.jdbc.checkpointtable.create",
0555: checkpointTableName));
0556: try {
0557: stmt = connection.createStatement();
0558: stmt.executeUpdate(checkpointTableCreateStatement);
0559: stmt.close();
0560: } catch (SQLException e) {
0561: throw new SQLException(Translate.get(
0562: "recovery.jdbc.checkpointtable.create.failed",
0563: new String[] { checkpointTableName,
0564: e.getMessage() }));
0565: }
0566:
0567: // Add an initial checkpoint in the table
0568: setInitialEmptyRecoveryLogCheckpoint();
0569: }
0570: if (createBackendTable) {
0571: if (logger.isInfoEnabled())
0572: logger.info(Translate.get(
0573: "recovery.jdbc.backendtable.create",
0574: backendTableName));
0575: try {
0576: stmt = connection.createStatement();
0577: stmt.executeUpdate(backendTableCreateStatement);
0578: stmt.close();
0579: } catch (SQLException e) {
0580: throw new SQLException(
0581: Translate
0582: .get(
0583: "recovery.jdbc.backendtable.create.failed",
0584: new String[] {
0585: backendTableName,
0586: e.getMessage() }));
0587: }
0588: }
0589: if (createDumpTable) {
0590: if (logger.isInfoEnabled())
0591: logger.info(Translate
0592: .get("recovery.jdbc.dumptable.create",
0593: dumpTableName));
0594: try {
0595: stmt = connection.createStatement();
0596: stmt.executeUpdate(dumpTableCreateStatement);
0597: stmt.close();
0598: } catch (SQLException e) {
0599: throw new SQLException(Translate.get(
0600: "recovery.jdbc.dumptable.create.failed",
0601: new String[] { dumpTableName, e.getMessage() }));
0602: }
0603: }
0604: if (stmt != null) // created some tables: consider we did an init
0605: setLastManDown();
0606: }
0607:
0608: /**
0609: * Adds a checkpoint called InitialEmptyRecoveryLog in the checkpoint table.
0610: * The checkpoint points to the current logTableId.
0611: *
0612: * @throws SQLException if an error occurs
0613: */
0614: private void setInitialEmptyRecoveryLogCheckpoint()
0615: throws SQLException {
0616: String checkpointName = "Initial_empty_recovery_log";
0617: PreparedStatement pstmt = null;
0618: try {
0619: if (logger.isDebugEnabled())
0620: logger.debug("Storing checkpoint " + checkpointName
0621: + " at request id " + logTableId);
0622: pstmt = getDatabaseConnection().prepareStatement(
0623: "INSERT INTO " + checkpointTableName
0624: + " VALUES(?,?)");
0625: pstmt.setString(1, checkpointName);
0626: pstmt.setLong(2, logTableId);
0627: pstmt.executeUpdate();
0628: pstmt.close();
0629: } catch (SQLException e) {
0630: try {
0631: if (pstmt != null)
0632: pstmt.close();
0633: } catch (Exception ignore) {
0634: }
0635: throw new SQLException(Translate.get(
0636: "recovery.jdbc.checkpoint.store.failed",
0637: new String[] { checkpointName, e.getMessage() }));
0638: }
0639: }
0640:
0641: public int getRecoveryLogSize() throws SQLException {
0642: ResultSet rs = getDatabaseConnection().createStatement()
0643: .executeQuery(
0644: "select count(*) from " + getLogTableName());
0645:
0646: if (!rs.next()) {
0647: logger.error("Failed to get count of log entries.");
0648: return -1;
0649: }
0650: return rs.getInt(1);
0651: }
0652:
0653: /**
0654: * Check the recovery log consistency for the first controller starting in a
0655: * cluster (might come back from a complete cluster outage).
0656: *
0657: * @throws SQLException if a recovery log access error occurs
0658: */
0659: public void checkRecoveryLogConsistency() throws SQLException {
0660: logger.info(Translate.get("recovery.consistency.checking"));
0661:
0662: PreparedStatement stmt = null;
0663: ResultSet rs = null;
0664: PreparedStatement updateStmt = null;
0665: try {
0666: // Look for requests with the execution status still to EXECUTING.
0667: stmt = getDatabaseConnection().prepareStatement(
0668: "SELECT * FROM " + getLogTableName()
0669: + " WHERE exec_status LIKE ?");
0670: stmt.setString(1, LogEntry.EXECUTING);
0671: rs = stmt.executeQuery();
0672:
0673: if (rs.next()) { // Change entry status to UNKNOWN
0674: updateStmt = getDatabaseConnection().prepareStatement(
0675: "UPDATE " + getLogTableName()
0676: + " SET exec_status=? WHERE log_id=?");
0677: updateStmt.setString(1, LogEntry.UNKNOWN);
0678: do {
0679: long logId = rs.getLong("log_id");
0680: if (logger.isWarnEnabled())
0681: logger
0682: .warn("Log entry "
0683: + logId
0684: + " ("
0685: + rs
0686: .getString(getLogTableSqlColumnName())
0687: + ") still has an executing status in the recovery log. Switching to unknown state.");
0688: updateStmt.setLong(2, logId);
0689: updateStmt.executeUpdate();
0690: } while (rs.next());
0691: updateStmt.close();
0692: }
0693: rs.close();
0694: stmt.close();
0695:
0696: // Look for open transactions
0697: stmt = getDatabaseConnection()
0698: .prepareStatement(
0699: "SELECT * FROM "
0700: + getLogTableName()
0701: + " WHERE "
0702: + getLogTableSqlColumnName()
0703: + " LIKE ? AND "
0704: + "transaction_id not in (SELECT transaction_id"
0705: + " FROM " + getLogTableName()
0706: + " WHERE "
0707: + getLogTableSqlColumnName()
0708: + " = ? OR "
0709: + getLogTableSqlColumnName()
0710: + " = ?) ");
0711:
0712: stmt.setString(1, BEGIN + "%");
0713: stmt.setString(2, COMMIT);
0714: stmt.setString(3, ROLLBACK);
0715:
0716: rs = stmt.executeQuery();
0717: while (rs.next()) {
0718: // Add a rollback in the log
0719: long tid = rs.getLong("transaction_id");
0720: if (logger.isWarnEnabled())
0721: logger
0722: .warn("Transaction "
0723: + tid
0724: + " has not completed. Inserting a rollback in the recovery log");
0725: long logId = logRollback(new TransactionMetaData(tid,
0726: 0, UNKNOWN_USER, false, 0));
0727: logRequestCompletion(logId, true, 0);
0728: }
0729: rs.close();
0730: } catch (SQLException e) {
0731: logger.error("Failed to check recovery log consistency", e);
0732: throw new SQLException(Translate.get(
0733: "recovery.consistency.checking.failed", e
0734: .getMessage()));
0735: } finally {
0736: try {
0737: if (rs != null)
0738: rs.close();
0739: } catch (Exception ignore) {
0740: }
0741: try {
0742: if (stmt != null)
0743: stmt.close();
0744: } catch (Exception ignore) {
0745: }
0746: try {
0747: if (updateStmt != null)
0748: updateStmt.close();
0749: } catch (Exception ignore) {
0750: }
0751: }
0752:
0753: }
0754:
0755: /**
0756: * Reset the current log table id and delete the recovery log information
0757: * older than the given checkpoint. This method also deletes all entries in
0758: * the checkpoint table. This method is asynchronous: the delete is performed
0759: * via a post to the logger thread.
0760: *
0761: * @param checkpointName the checkpoint name to delete from.
0762: * @param newCheckpointId the new checkpoint identifier
0763: * @throws SQLException if an error occurs
0764: */
0765: public void resetLogTableIdAndDeleteRecoveryLog(
0766: String checkpointName, long newCheckpointId)
0767: throws SQLException {
0768: long oldId = getCheckpointLogId(checkpointName);
0769: synchronized (this ) {
0770: // resetLog cleans the recovery log table, resets the checkpointTable and
0771: // renumber the queries since oldId (checkpoint assigned to the transfer).
0772: loggerThread.log(new ResetLogEvent(oldId, newCheckpointId,
0773: checkpointName));
0774: logTableId = newCheckpointId + logTableId - oldId;
0775: }
0776: }
0777:
0778: /**
0779: * Reset the recovery log by deleting the log table and checkpoint table. This
0780: * method also clears checkpoint names in the dump table. This method should
0781: * only be used to re-initialize the virtual database at initial startup or
0782: * after a complete cluster outage.
0783: *
0784: * @param resetLogId True if the logId has to be reset. This is used only with
0785: * "initialize <backend> force" command.
0786: * @throws SQLException if a database access error occurs
0787: */
0788: public void resetRecoveryLog(boolean resetLogId)
0789: throws SQLException {
0790: PreparedStatement pstmt = null;
0791: try {
0792: // Clean the tables
0793:
0794: if (logger.isDebugEnabled())
0795: logger.debug("Deleting " + logTableName + " table.");
0796: pstmt = getDatabaseConnection().prepareStatement(
0797: "DELETE FROM " + logTableName);
0798: pstmt.executeUpdate();
0799: pstmt.close();
0800:
0801: if (resetLogId)
0802: // Ensure logTableId points to the right place for an empty log!
0803: logTableId = 0;
0804:
0805: if (logger.isDebugEnabled())
0806: logger.debug("Deleting " + checkpointTableName
0807: + " table.");
0808: pstmt = getDatabaseConnection().prepareStatement(
0809: "DELETE FROM " + checkpointTableName);
0810: pstmt.executeUpdate();
0811: pstmt.close();
0812:
0813: if (logger.isDebugEnabled())
0814: logger
0815: .debug("Resetting checkpoint associated to dumps in "
0816: + dumpTableName + " table.");
0817: pstmt = getDatabaseConnection().prepareStatement(
0818: "UPDATE " + dumpTableName
0819: + " SET checkpoint_name=?");
0820: pstmt.setString(1, "");
0821: pstmt.executeUpdate();
0822: pstmt.close();
0823: } catch (SQLException e) {
0824: String msg = "Error while resetting recovery log";
0825: logger.error(msg, e);
0826: try {
0827: if (pstmt != null)
0828: pstmt.close();
0829: } catch (Exception ignore) {
0830: }
0831: throw new SQLException(msg + " (" + e + ")");
0832: }
0833:
0834: // Put back an initial empty recovery log in the checkpoint table
0835: setInitialEmptyRecoveryLogCheckpoint();
0836: }
0837:
0838: /**
0839: * Invalidate the connection when an error occurs so that the next call to
0840: * getDatabaseConnection() re-allocates a new connection.
0841: *
0842: * @see #getDatabaseConnection()
0843: */
0844: protected synchronized void invalidateInternalConnection() {
0845: internalConnectionManagedByGetDatabaseConnection = null;
0846: }
0847:
0848: //
0849: //
0850: // Logging related methods
0851: //
0852: //
0853:
0854: /**
0855: * Log a transaction abort. This is used only for transaction that were
0856: * started but where no request was executed, which is in fact an empty
0857: * transaction. The underlying implementation might safely discard the
0858: * corresponding begin from the log as an optimization.
0859: *
0860: * @param tm The transaction marker metadata
0861: * @return the identifier of the entry in the recovery log
0862: */
0863: public long logAbort(TransactionMetaData tm) {
0864: // We have to perform exactly the same job as a rollback
0865: return logRollback(tm);
0866: }
0867:
0868: /**
0869: * Log the beginning of a new transaction (always as a SUCCESS not EXECUTING).
0870: *
0871: * @param tm The transaction marker metadata
0872: * @return the identifier of the entry in the recovery log
0873: */
0874: public long logBegin(TransactionMetaData tm) {
0875: // Store the begin in the database
0876: long id = incrementLogTableId();
0877: LogEntry logEntry;
0878: if (tm.isPersistentConnection()) { // Append the persistent connection id to the SQL
0879: logEntry = new LogEntry(id, tm.getLogin(), BEGIN + " "
0880: + tm.getPersistentConnectionId(), null,
0881: LogEntry.TRANSACTION, tm.getTransactionId());
0882: } else {
0883: logEntry = new LogEntry(id, tm.getLogin(), BEGIN, null,
0884: LogEntry.TRANSACTION, tm.getTransactionId());
0885: }
0886: logEntry.setExecutionStatus(LogEntry.SUCCESS);
0887: loggerThread.log(new LogRequestEvent(logEntry));
0888: return id;
0889: }
0890:
0891: /**
0892: * Log the closing of the given persistent connection (always as a SUCCESS).
0893: *
0894: * @param login the login executing the request
0895: * @param persistentConnectionId the id of the persistent connection to close
0896: * @return the identifier of the entry in the recovery log
0897: */
0898: public long logClosePersistentConnection(String login,
0899: long persistentConnectionId) {
0900: long id = incrementLogTableId();
0901: LogEntry logEntry = new LogEntry(id, login,
0902: CLOSE_PERSISTENT_CONNECTION, null,
0903: LogEntry.PERSISTENT_CONNECTION, persistentConnectionId);
0904: logEntry.setExecutionStatus(LogEntry.SUCCESS);
0905: loggerThread.log(new LogRequestEvent(logEntry));
0906: return id;
0907: }
0908:
0909: /**
0910: * Log a transaction commit.
0911: *
0912: * @param tm The transaction marker metadata
0913: * @return the identifier of the entry in the recovery log
0914: */
0915: public long logCommit(TransactionMetaData tm) {
0916: long id = incrementLogTableId();
0917: loggerThread.log(new LogCommitEvent(new LogEntry(id, tm
0918: .getLogin(), COMMIT, null, LogEntry.TRANSACTION, tm
0919: .getTransactionId())));
0920: tm.setLogId(id);
0921: return id;
0922: }
0923:
0924: /**
0925: * Log a log entry in the recovery log.
0926: *
0927: * @param logEntry the log entry to to be written in the recovery log.
0928: */
0929: public void logLogEntry(LogEntry logEntry) {
0930: loggerThread.log(new LogRequestEvent(logEntry));
0931: }
0932:
0933: /**
0934: * Log the opening of the given persistent connection (always as a SUCCESS).
0935: *
0936: * @param login the login executing the request
0937: * @param persistentConnectionId the id of the persistent connection to open
0938: * @return the identifier of the entry in the recovery log
0939: */
0940: public long logOpenPersistentConnection(String login,
0941: long persistentConnectionId) {
0942: long id = incrementLogTableId();
0943: LogEntry logEntry = new LogEntry(id, login,
0944: OPEN_PERSISTENT_CONNECTION, null,
0945: LogEntry.PERSISTENT_CONNECTION, persistentConnectionId);
0946: logEntry.setExecutionStatus(LogEntry.EXECUTING);
0947: loggerThread.log(new LogRequestEvent(logEntry));
0948: return id;
0949: }
0950:
0951: /**
0952: * Log a transaction savepoint removal.
0953: *
0954: * @param tm The transaction marker metadata
0955: * @param name The name of the savepoint to log
0956: * @return the identifier of the entry in the recovery log
0957: */
0958: public long logReleaseSavepoint(TransactionMetaData tm, String name) {
0959: long id = incrementLogTableId();
0960: loggerThread.log(new LogRequestEvent(new LogEntry(id, tm
0961: .getLogin(), "release " + name, null,
0962: LogEntry.TRANSACTION, tm.getTransactionId())));
0963: return id;
0964: }
0965:
0966: private LogEntry buildLogEntry(AbstractRequest request, long id,
0967: long execTime, int updateCount) {
0968: String autoConnTrans;
0969: long tid;
0970: if (request.isAutoCommit()) {
0971: if (request.isPersistentConnection()) {
0972: autoConnTrans = LogEntry.PERSISTENT_CONNECTION;
0973: tid = request.getPersistentConnectionId();
0974: } else {
0975: autoConnTrans = LogEntry.AUTOCOMMIT;
0976: tid = 0;
0977: }
0978: } else {
0979: autoConnTrans = LogEntry.TRANSACTION;
0980: tid = request.getTransactionId();
0981: }
0982: LogEntry logEntry = new LogEntry(id, request.getLogin(),
0983: request.getSqlOrTemplate(), request
0984: .getPreparedStatementParameters(),
0985: autoConnTrans, tid, request.getEscapeProcessing(),
0986: request.getId(), execTime, updateCount);
0987: return logEntry;
0988: }
0989:
0990: /**
0991: * Log a request (read or write) that is going to execute and set the logId on
0992: * the request object.
0993: *
0994: * @param request The request to log (read or write)
0995: * @return the identifier of the entry in the recovery log
0996: */
0997: public long logRequestExecuting(AbstractRequest request) {
0998: long id = incrementLogTableId();
0999: long execTime = 0;
1000: if (request.getStartTime() != 0) {
1001: if (request.getEndTime() != 0)
1002: execTime = request.getEndTime()
1003: - request.getStartTime();
1004: else
1005: execTime = System.currentTimeMillis()
1006: - request.getStartTime();
1007: }
1008: loggerThread.log(new LogRequestEvent(buildLogEntry(request, id,
1009: execTime, 0)));
1010: request.setLogId(id);
1011: return id;
1012: }
1013:
1014: /**
1015: * Update the completion status of a request completion in the recovery log
1016: * given its id.
1017: *
1018: * @param logId recovery log id for this request as it was originally logged
1019: * @param success true if the request execution was successful
1020: * @param execTime request execution time in ms
1021: */
1022: public void logRequestCompletion(long logId, boolean success,
1023: long execTime) {
1024: logRequestExecuteUpdateCompletion(logId, success, 0, execTime);
1025: }
1026:
1027: /**
1028: * Update the completion status of a request completion in the recovery log
1029: * given its id.
1030: *
1031: * @param logId recovery log id for this request as it was originally logged
1032: * @param success true if the request execution was successful
1033: * @param execTimeInMs request execution time in ms
1034: * @param updateCount request update count when successful
1035: */
1036: public void logRequestCompletion(long logId, boolean success,
1037: long execTimeInMs, int updateCount) {
1038: logRequestExecuteUpdateCompletion(logId, success, updateCount,
1039: execTimeInMs);
1040:
1041: }
1042:
1043: /**
1044: * Update the completion status of a request executed with executeUpdate().
1045: *
1046: * @param logId recovery log id for this request as it was originally logged
1047: * @param success true if the request execution was successful
1048: * @param updateCount the number of updated rows returned by executeUpdate()
1049: * (meaningful only if success)
1050: * @param execTime request execution time in ms
1051: */
1052: public void logRequestExecuteUpdateCompletion(long logId,
1053: boolean success, int updateCount, long execTime) {
1054: loggerThread.log(new LogRequestCompletionEvent(logId, success,
1055: updateCount, execTime));
1056: }
1057:
1058: /**
1059: * Log a transaction rollback.
1060: *
1061: * @param tm The transaction marker metadata
1062: * @return the identifier of the entry in the recovery log
1063: */
1064: public long logRollback(TransactionMetaData tm) {
1065: long id = incrementLogTableId();
1066: // Some backends started a recovery process, log the rollback
1067: loggerThread.log(new LogRollbackEvent(new LogEntry(id, tm
1068: .getLogin(), ROLLBACK, null, LogEntry.TRANSACTION, tm
1069: .getTransactionId())));
1070: tm.setLogId(id);
1071: return id;
1072: }
1073:
1074: /**
1075: * Log a transaction rollback to a savepoint
1076: *
1077: * @param tm The transaxtion marker metadata
1078: * @param savepointName The name of the savepoint
1079: * @return the identifier of the entry in the recovery log
1080: */
1081: public long logRollbackToSavepoint(TransactionMetaData tm,
1082: String savepointName) {
1083: long id = incrementLogTableId();
1084: loggerThread.log(new LogRequestEvent(new LogEntry(id, tm
1085: .getLogin(), "rollback " + savepointName, null,
1086: LogEntry.TRANSACTION, tm.getTransactionId())));
1087: tm.setLogId(id);
1088: return id;
1089: }
1090:
1091: /**
1092: * Log a transaction savepoint.
1093: *
1094: * @param tm The transaction marker metadata
1095: * @param name The name of the savepoint to log
1096: * @return the identifier of the entry in the recovery log
1097: */
1098: public long logSetSavepoint(TransactionMetaData tm, String name) {
1099: long id = incrementLogTableId();
1100: loggerThread.log(new LogRequestEvent(new LogEntry(id, tm
1101: .getLogin(), "savepoint " + name, null,
1102: LogEntry.TRANSACTION, tm.getTransactionId())));
1103: tm.setLogId(id);
1104: return id;
1105: }
1106:
1107: /**
1108: * Throw an SQLException if the recovery log is shutting down
1109: *
1110: * @throws SQLException if recovery log is shutting down
1111: */
1112: private synchronized void checkIfShuttingDown() throws SQLException {
1113: if (isShuttingDown)
1114: throw new SQLException(
1115: "Recovery log is shutting down, log access has been denied");
1116: }
1117:
1118: /**
1119: * Shutdown the recovery log and all its threads by enqueueing a log shutdown
1120: * event and waiting until it is processed.
1121: */
1122: public void shutdown() {
1123: synchronized (this ) {
1124: if (isShuttingDown)
1125: return;
1126: isShuttingDown = true;
1127: }
1128:
1129: this .timer.cancel();
1130:
1131: if (loggerThread != null) {
1132: ShutdownLogEvent event = new ShutdownLogEvent();
1133: synchronized (event) {
1134: loggerThread.log(event);
1135: try {
1136: event.wait();
1137: } catch (InterruptedException e) {
1138: logger
1139: .warn(
1140: "Thread interrupted while awaiting log shutdown",
1141: e);
1142: }
1143: }
1144: try {
1145: loggerThread.join();
1146: } catch (InterruptedException e) {
1147: logger
1148: .warn(
1149: "Thread interrupted while awaiting loggerThread shutdown",
1150: e);
1151: }
1152: }
1153:
1154: if (url.startsWith("jdbc:hsqldb:file:")) {
1155: try {
1156: getDatabaseConnection().createStatement().execute(
1157: "SHUTDOWN");
1158: } catch (SQLException e) {
1159: logger.warn("Error while shuting down hsqldb", e);
1160: }
1161: }
1162: }
1163:
1164: //
1165: // Recovery process
1166: //
1167:
1168: /**
1169: * Notify the recovery log that a recovery process has started.
1170: */
1171: public synchronized void beginRecovery() {
1172: recoveringNb++;
1173: }
1174:
1175: /**
1176: * Possibly clean the recovery log after all recovery process are done. This
1177: * removes all failed statements or transactions without side effect from the
1178: * recovery log.
1179: *
1180: * @exception SQLException if an error occurs.
1181: */
1182: public void cleanRecoveryLog() throws SQLException {
1183: checkIfShuttingDown();
1184:
1185: PreparedStatement stmt = null;
1186:
1187: ResultSet rs = null;
1188: PreparedStatement pstmt = null;
1189: try {
1190: // Get the list of failed requests
1191: stmt = getDatabaseConnection().prepareStatement(
1192: "SELECT log_id," + logTableSqlColumnName + " FROM "
1193: + logTableName
1194: + " WHERE exec_status LIKE ?");
1195: stmt.setString(1, LogEntry.FAILED);
1196: rs = stmt.executeQuery();
1197: if (rs.next()) { // Check entries to see if statement can be removed
1198: pstmt = getDatabaseConnection().prepareStatement(
1199: "DELETE FROM " + logTableName
1200: + " WHERE log_id=?");
1201: do {
1202: String sql = rs.getString(2);
1203: AbstractRequest decodedRequest = requestFactory
1204: .requestFromString(sql, false, false,
1205: timeout, "\n");
1206: // For now delete all requests that are not stored procedures
1207:
1208: // TODO: Add a flag on requests to tell whether they have
1209: // unrollback-able side effects (SEQUOIA-587)
1210: if (decodedRequest instanceof StoredProcedure) {
1211: pstmt.setLong(1, rs.getLong(1));
1212: pstmt.executeUpdate();
1213: }
1214: } while (rs.next());
1215: pstmt.close();
1216: }
1217: rs.close();
1218: stmt.close();
1219: } catch (SQLException e) {
1220: invalidateInternalConnection();
1221: try {
1222: if (stmt != null)
1223: stmt.close();
1224: } catch (Exception ignore) {
1225: }
1226: try {
1227: if (pstmt != null)
1228: pstmt.close();
1229: } catch (Exception ignore) {
1230: }
1231: try {
1232: if (rs != null)
1233: rs.close();
1234: } catch (Exception ignore) {
1235: }
1236: throw new SQLException("Unable get cleanup recovery log ("
1237: + e + ")");
1238: }
1239: }
1240:
1241: /**
1242: * Notify the recovery log that a recovery process has finished. If this is
1243: * the last recovery process to finish, the cleanRecoveryLog method is called
1244: *
1245: * @see #cleanRecoveryLog()
1246: */
1247: public synchronized void endRecovery() {
1248: recoveringNb--;
1249: if (recoveringNb == 0) {
1250: try {
1251: cleanRecoveryLog();
1252: } catch (SQLException e) {
1253: logger.error(Translate.get("recovery.cleaning.failed"),
1254: e);
1255: }
1256: }
1257: }
1258:
1259: /**
1260: * Retrieve recovery information on a backend. This includes, the last known
1261: * state of the backend, and the last known checkpoint
1262: *
1263: * @param databaseName the virtual database name
1264: * @param backendName the backend name
1265: * @return <code>BackendRecoveryInfo<code> instance or <code>null</code> if the backend does not exist
1266: * @throws SQLException if a database access error occurs
1267: */
1268: public BackendRecoveryInfo getBackendRecoveryInfo(
1269: String databaseName, String backendName)
1270: throws SQLException {
1271: checkIfShuttingDown();
1272:
1273: PreparedStatement stmt = null;
1274: String checkpoint = null;
1275: int backendState = BackendState.UNKNOWN;
1276: try {
1277: // 1. Get the reference point to delete
1278: stmt = getDatabaseConnection()
1279: .prepareStatement(
1280: "SELECT * FROM "
1281: + backendTableName
1282: + " WHERE backend_name LIKE ? AND database_name LIKE ?");
1283: stmt.setString(1, backendName);
1284: stmt.setString(2, databaseName);
1285: ResultSet rs = stmt.executeQuery();
1286:
1287: if (rs.next()) {
1288: checkpoint = rs.getString("checkpoint_name");
1289: backendState = rs.getInt("backend_state");
1290: }
1291: rs.close();
1292: } catch (SQLException e) {
1293: invalidateInternalConnection();
1294: logger
1295: .info(
1296: "An error occured while retrieving backend recovery information",
1297: e);
1298: throw e;
1299: } finally {
1300: try {
1301: if (stmt != null)
1302: stmt.close();
1303: } catch (Exception ignore) {
1304: }
1305: }
1306: return new BackendRecoveryInfo(backendName, checkpoint,
1307: backendState, databaseName);
1308: }
1309:
1310: /**
1311: * Return the current log id.
1312: *
1313: * @return the current log id.
1314: */
1315: protected long getCurrentLogId() {
1316: return logTableId;
1317: }
1318:
1319: /**
1320: * Get the id of the last request logged in the recovery log.
1321: *
1322: * @param controllerId the controller ID which determines the search space for
1323: * request IDs
1324: * @return the last request id.
1325: * @throws SQLException if an error occured while retrieving the id.
1326: */
1327: public long getLastRequestId(long controllerId) throws SQLException {
1328: String request = "select max(request_id) from " + logTableName
1329: + " where (request_id > ?) and (request_id <= ?)";
1330: return getLastId(request, controllerId, null);
1331: }
1332:
1333: /**
1334: * Get the id of the last transaction logged in the recovery log.
1335: *
1336: * @param controllerId the controller ID which determines the search space for
1337: * transaction IDs
1338: * @return the last transaction id.
1339: * @throws SQLException if an error occured while retrieving the id.
1340: */
1341: public long getLastTransactionId(long controllerId)
1342: throws SQLException {
1343: String request = "select max(transaction_id) from "
1344: + logTableName
1345: + " where (transaction_id > ?) and (transaction_id <= ?)";
1346: return getLastId(request, controllerId, null);
1347: }
1348:
1349: /**
1350: * Get the id of the last transaction logged in the recovery log.
1351: *
1352: * @param controllerId the controller ID which determines the search space for
1353: * transaction IDs
1354: * @return the last transaction id.
1355: * @throws SQLException if an error occured while retrieving the id.
1356: */
1357: public long getLastConnectionId(long controllerId)
1358: throws SQLException {
1359: String request = "select max(transaction_id) from "
1360: + logTableName
1361: + " where (transaction_id > ?) and (transaction_id <= ?) "
1362: + " and " + getLogTableSqlColumnName() + " like ?";
1363: return getLastId(request, controllerId,
1364: OPEN_PERSISTENT_CONNECTION);
1365: }
1366:
1367: /**
1368: * Return the last ID found in the log which was allocated for (by) a given
1369: * controller ID if any, or controller ID if no previous id was used in this
1370: * space.<br>
1371: * Helper method used to get last transaction/request/connection IDs.
1372: *
1373: * @param controllerId the controller ID which determines the search space for
1374: * already used IDs.
1375: * @param sql optional SQL to match (null if useless)
1376: * @return the last ID found in the log which was allocated for controller ID
1377: * if any, or controller ID if no previous ID was used in this space.
1378: * @throws SQLException if an error occured while inspecting the log.
1379: */
1380: private long getLastId(String request, long controllerId, String sql)
1381: throws SQLException {
1382: checkIfShuttingDown();
1383:
1384: FindLastIdEvent event = new FindLastIdEvent(
1385: getDatabaseConnection(), this , request, controllerId,
1386: sql);
1387: synchronized (event) {
1388: loggerThread.log(event);
1389: try {
1390: event.wait();
1391: } catch (InterruptedException e) {
1392: throw new SQLException(
1393: "Interrupted while retrieving last ID for controller ID"
1394: + controllerId + " ("
1395: + event.getCatchedException() + ")");
1396: }
1397: }
1398: if (event.getCatchedException() != null)
1399: throw event.getCatchedException();
1400: return event.getValue();
1401: }
1402:
1403: /**
1404: * Returns the recoveringNb value.
1405: *
1406: * @return Returns the recoveringNb.
1407: */
1408: public long getRecoveringNb() {
1409: return recoveringNb;
1410: }
1411:
1412: /**
1413: * Number of queries that can be accumulated into a batch when recovering
1414: *
1415: * @return the recovery batch size
1416: */
1417: public int getRecoveryBatchSize() {
1418: return recoveryBatchSize;
1419: }
1420:
1421: /**
1422: * Returns true if the recovery log has logged a Begin for the given
1423: * transaction id.
1424: *
1425: * @param tid the transaction identifier
1426: * @return true if begin has been logged for this transaction
1427: * @throws SQLException if the query fails on the recovery database
1428: */
1429: public boolean hasLoggedBeginForTransaction(Long tid)
1430: throws SQLException {
1431: // Check for something in the queue first
1432: if (loggerThread.hasLogEntryForTransaction(tid.longValue()))
1433: return true;
1434:
1435: // Check in the database
1436: PreparedStatement pstmt = null;
1437: ResultSet rs = null;
1438: try {
1439: pstmt = getDatabaseConnection().prepareStatement(
1440: "select transaction_id from " + logTableName
1441: + " where transaction_id=?");
1442: // If something was logged for this transaction then begin was logged
1443: // first so if the ResultSet is not empty the begin has been logged.
1444: pstmt.setLong(1, tid.longValue());
1445: rs = pstmt.executeQuery();
1446: return rs.next();
1447: } catch (SQLException e) {
1448: invalidateInternalConnection();
1449: throw e;
1450: } finally {
1451: try {
1452: if (rs != null)
1453: rs.close();
1454: } catch (Exception ignore) {
1455: }
1456: try {
1457: if (pstmt != null)
1458: pstmt.close();
1459: } catch (Exception ignore) {
1460: }
1461: }
1462: }
1463:
1464: /**
1465: * Returns <code>true</code> if at least one backend has started a recover
1466: * process.
1467: *
1468: * @return <code>boolean</code>
1469: */
1470: public synchronized boolean isRecovering() {
1471: return recoveringNb > 0;
1472: }
1473:
1474: /**
1475: * Get the next log entry from the recovery log given the id of the previous
1476: * log entry.
1477: *
1478: * @param previousLogEntryId previous log entry identifier
1479: * @return the next log entry from the recovery log or null if no further
1480: * entry can be found
1481: * @throws SQLException if an error occurs while accesing the recovery log
1482: */
1483: public LogEntry getNextLogEntry(long previousLogEntryId)
1484: throws SQLException {
1485: checkIfShuttingDown();
1486:
1487: ResultSet rs = null;
1488: boolean emptyResult;
1489: PreparedStatement stmt = null;
1490: try {
1491: stmt = getDatabaseConnection()
1492: .prepareStatement(
1493: "SELECT * FROM " + logTableName
1494: + " WHERE log_id=?");
1495: // Note that the statement is closed in the finally block
1496: do {
1497: previousLogEntryId++;
1498: stmt.setLong(1, previousLogEntryId);
1499: // Close ResultSet of previous loop step to free resources.
1500: if (rs != null)
1501: rs.close();
1502: rs = stmt.executeQuery();
1503: emptyResult = !rs.next();
1504: } while (emptyResult && (previousLogEntryId <= logTableId));
1505:
1506: // No more request after this one
1507: if (emptyResult)
1508: return null;
1509:
1510: // Read columns in order to prevent issues with MS SQL Server as reported
1511: // Charles Cordingley.
1512: long id = rs.getLong(COLUMN_INDEX_LOG_ID);
1513: String user = rs.getString(COLUMN_INDEX_VLOGIN);
1514: String sql = rs.getString(COLUMN_INDEX_SQL);
1515: String sqlParams = rs.getString(COLUMN_INDEX_SQL_PARAMS);
1516: String autoConnTran = rs
1517: .getString(COLUMN_INDEX_AUTO_CONN_TRAN);
1518: long transactionId = rs
1519: .getLong(COLUMN_INDEX_TRANSACTION_ID);
1520: long requestId = rs.getLong(COLUMN_INDEX_REQUEST_ID);
1521: long execTime = rs.getLong(COLUMN_INDEX_EXEC_TIME);
1522: int updateCount = rs.getInt(COLUMN_INDEX_UPDATE_COUNT);
1523: String status = rs.getString(COLUMN_INDEX_EXEC_STATUS);
1524: // Note that booleanProcessing = true is the default value in
1525: // AbstractRequest
1526: return new LogEntry(id, user, sql, sqlParams, autoConnTran,
1527: transactionId, false, requestId, execTime,
1528: updateCount, status);
1529: } catch (SQLException e) {
1530: invalidateInternalConnection();
1531: throw new SQLException(Translate.get(
1532: "recovery.jdbc.recover.failed", e));
1533: } finally {
1534: try {
1535: if (rs != null)
1536: rs.close();
1537: } catch (Exception ignore) {
1538: }
1539: try {
1540: if (stmt != null)
1541: stmt.close();
1542: } catch (Exception ignore) {
1543: }
1544: }
1545: }
1546:
1547: /**
1548: * Return the real number of log entries between 2 log ids (usually matching
1549: * checkpoint indices). The SELECT includes both boundaries. The value is
1550: * retrieved through an event posted in the logger thread queue (makes sure
1551: * that all previous entries have been flushed to the log).
1552: *
1553: * @param lowerLogId the lower log id
1554: * @param upperLogId the upper log id
1555: * @return the number of entries between the 2 ids
1556: * @throws SQLException if an error occurs querying the recovery log
1557: */
1558: public long getNumberOfLogEntries(long lowerLogId, long upperLogId)
1559: throws SQLException {
1560: checkIfShuttingDown();
1561:
1562: GetNumberOfLogEntriesEvent event = new GetNumberOfLogEntriesEvent(
1563: lowerLogId, upperLogId);
1564: synchronized (event) {
1565: loggerThread.log(event);
1566: try {
1567: event.wait();
1568: } catch (InterruptedException e) {
1569: throw new SQLException(
1570: "Interrupted while waiting for number of log entries in ["
1571: + lowerLogId + "," + upperLogId + "]");
1572: }
1573: }
1574: return event.getNbOfLogEntries();
1575: }
1576:
1577: /**
1578: * Returns the number of log entries currently in the log table. The method is
1579: * not accurate since this number can change very quickly but it is correct
1580: * enough for management purposes.
1581: *
1582: * @return the number of log entries currently in the log table
1583: * @throws SQLException if an error occurs while counting the number of log
1584: * entries in the log table
1585: */
1586: long getNumberOfLogEntries() throws SQLException {
1587: Statement stmt;
1588: stmt = getDatabaseConnection().createStatement();
1589: ResultSet rs = null;
1590: try {
1591: rs = stmt.executeQuery("select count(*) from "
1592: + logTableName);
1593: rs.next();
1594: return rs.getLong(1);
1595: } finally {
1596: if (rs != null) {
1597: rs.close();
1598: }
1599: if (stmt != null) {
1600: stmt.close();
1601: }
1602: }
1603: }
1604:
1605: /**
1606: * Get the update count result for the execution of the query that has the
1607: * provided unique id.
1608: *
1609: * @param requestId request result to look for
1610: * @return update count or -1 if not found
1611: * @throws SQLException if an error occured
1612: */
1613: public int getUpdateCountResultForQuery(long requestId)
1614: throws SQLException {
1615: checkIfShuttingDown();
1616:
1617: GetUpdateCountEvent event = new GetUpdateCountEvent(
1618: getDatabaseConnection(), this , requestId);
1619: synchronized (event) {
1620: loggerThread.log(event);
1621: try {
1622: event.wait();
1623: } catch (InterruptedException e) {
1624: throw new SQLException(
1625: "Interrupted while waiting for update count of request ID "
1626: + requestId + " ("
1627: + event.getCatchedException() + ")");
1628: }
1629: }
1630: if (event.getCatchedException() instanceof SQLException)
1631: throw (SQLException) event.getCatchedException();
1632:
1633: // Result might not have been found (NoResultAvailableException thrown) in
1634: // which case we just return the default update count (-1) that will cause
1635: // the driver to re-execute the query. If result was found,
1636: // event.getUpdateCount() contains the right value.
1637: return event.getUpdateCount();
1638: }
1639:
1640: /**
1641: * Tries to find a commit for a given transaction ID.
1642: *
1643: * @param transactionId the transaction id to look for
1644: * @return true if commit was found in recovery log, false otherwise
1645: * @throws SQLException if there was a problem while searching the recovery
1646: * log
1647: */
1648: public boolean findCommitForTransaction(long transactionId)
1649: throws SQLException {
1650: checkIfShuttingDown();
1651:
1652: FindCommitEvent event = new FindCommitEvent(
1653: getDatabaseConnection(), this , transactionId);
1654: synchronized (event) {
1655: loggerThread.log(event);
1656: try {
1657: event.wait();
1658: } catch (InterruptedException e) {
1659: throw new SQLException(
1660: "Interrupted while waiting for commit of transaction ID "
1661: + transactionId + " ("
1662: + event.getCatchedException() + ")");
1663: }
1664: }
1665: if (event.getCatchedException() != null)
1666: throw event.getCatchedException();
1667: return event.wasFound();
1668: }
1669:
1670: /**
1671: * Tries to find a commit for a given transaction ID.
1672: *
1673: * @param transactionId the transaction id to look for
1674: * @return commit status if found in recovery log, "" otherwise
1675: * @throws SQLException if there was a problem while searching the recovery
1676: * log
1677: */
1678: public String getCommitStatusForTransaction(long transactionId)
1679: throws SQLException {
1680: checkIfShuttingDown();
1681:
1682: FindCommitEvent event = new FindCommitEvent(
1683: getDatabaseConnection(), this , transactionId);
1684: synchronized (event) {
1685: loggerThread.log(event);
1686: try {
1687: event.wait();
1688: } catch (InterruptedException e) {
1689: throw new SQLException(
1690: "Interrupted while waiting for commit of transaction ID "
1691: + transactionId + " ("
1692: + event.getCatchedException() + ")");
1693: }
1694: }
1695: if (event.getCatchedException() != null)
1696: throw event.getCatchedException();
1697: return event.getStatus();
1698: }
1699:
1700: /**
1701: * Tries to find a rollback for a given transaction ID.
1702: *
1703: * @param transactionId the transaction id to look for
1704: * @return true if rollback was found in recovery log, false otherwise
1705: * @throws SQLException if there was a problem while searching the recovery
1706: * log
1707: */
1708: public boolean findRollbackForTransaction(long transactionId)
1709: throws SQLException {
1710: checkIfShuttingDown();
1711:
1712: FindRollbackEvent event = new FindRollbackEvent(
1713: getDatabaseConnection(), this , transactionId);
1714: synchronized (event) {
1715: loggerThread.log(event);
1716: try {
1717: event.wait();
1718: } catch (InterruptedException e) {
1719: throw new SQLException(
1720: "Interrupted while waiting for rollback of transaction ID "
1721: + transactionId + " ("
1722: + event.getCatchedException() + ")");
1723: }
1724: }
1725: if (event.getCatchedException() != null)
1726: throw event.getCatchedException();
1727: return event.wasFound();
1728: }
1729:
1730: /**
1731: * Tries to find a rollback for a given transaction ID and returns its status
1732: * if found.
1733: *
1734: * @param transactionId the transaction id to look for
1735: * @return commit status if found in recovery log, "" otherwise
1736: * @throws SQLException if there was a problem while searching the recovery
1737: * log
1738: */
1739: public String getRollbackStatusForTransaction(long transactionId)
1740: throws SQLException {
1741: checkIfShuttingDown();
1742:
1743: FindRollbackEvent event = new FindRollbackEvent(
1744: getDatabaseConnection(), this , transactionId);
1745: synchronized (event) {
1746: loggerThread.log(event);
1747: try {
1748: event.wait();
1749: } catch (InterruptedException e) {
1750: throw new SQLException(
1751: "Interrupted while waiting for commit of transaction ID "
1752: + transactionId + " ("
1753: + event.getCatchedException() + ")");
1754: }
1755: }
1756: if (event.getCatchedException() != null)
1757: throw event.getCatchedException();
1758: return event.getStatus();
1759: }
1760:
1761: /**
1762: * Tries to find a close for a given persistent connection ID.
1763: *
1764: * @param persistentConnectionId the persistent connection ID to look for
1765: * @return true if close was found in recovery log, false otherwise
1766: * @throws SQLException if there was a problem while searching the recovery
1767: * log
1768: */
1769: public boolean findCloseForPersistentConnection(
1770: long persistentConnectionId) throws SQLException {
1771: checkIfShuttingDown();
1772:
1773: FindClosePersistentConnectionEvent event = new FindClosePersistentConnectionEvent(
1774: getDatabaseConnection(), this , persistentConnectionId);
1775: synchronized (event) {
1776: loggerThread.log(event);
1777: try {
1778: event.wait();
1779: } catch (InterruptedException e) {
1780: throw new SQLException(
1781: "Interrupted while waiting for close of persistent connection ID "
1782: + persistentConnectionId + " ("
1783: + event.getCatchedException() + ")");
1784: }
1785: }
1786: if (event.getCatchedException() != null)
1787: throw event.getCatchedException();
1788: return event.wasFound();
1789: }
1790:
1791: /**
1792: * Get the next request (begin/commit/rollback or WriteRequest) from the
1793: * recovery log given the id of the previously recovered request.
1794: * <p>
1795: * The id of the request before the first one to recover is given by
1796: * getCheckpointRequestId.
1797: *
1798: * @param previousRequestId id of the previously recovered request
1799: * @param scheduler Scheduler that will be used to generate fake TransactionId
1800: * when recovering requests in autocommit mode
1801: * @return AbstractTask task corresponding to the next request to recover or
1802: * null if no such request exists
1803: * @exception SQLException if an error occurs
1804: * @see #getCheckpointLogId(String)
1805: */
1806: public RecoveryTask recoverNextRequest(long previousRequestId,
1807: AbstractScheduler scheduler) throws SQLException {
1808: RecoveryTask task = null;
1809:
1810: // Get the request with the id after previousRequestId.
1811: LogEntry logEntry = getNextLogEntry(previousRequestId);
1812: if (logEntry == null)
1813: return null;
1814:
1815: // Construct the request object according to its type
1816: long transactionId = logEntry.getTid();
1817: long id = logEntry.getLogId();
1818: String user = logEntry.getLogin();
1819: String sql = logEntry.getQuery().trim();
1820: String status = logEntry.getExecutionStatus();
1821:
1822: boolean escapeProcessing = true;
1823:
1824: if (CLOSE_PERSISTENT_CONNECTION.equals(sql)) {
1825: if (logger.isDebugEnabled())
1826: logger.debug("closing persistent connection: "
1827: + transactionId);
1828: task = new RecoveryTask(transactionId, id,
1829: new ClosePersistentConnectionTask(1, 1, user,
1830: transactionId), status);
1831: } else if (OPEN_PERSISTENT_CONNECTION.equals(sql)) {
1832: if (logger.isDebugEnabled())
1833: logger.debug("opening persistent connection: "
1834: + transactionId);
1835: task = new RecoveryTask(transactionId, id,
1836: new OpenPersistentConnectionTask(1, 1, user,
1837: transactionId), status);
1838: } else if (BEGIN.equals(sql)) {
1839: // DO NOT SWITCH THIS BLOCK WITH THE NEXT if BLOCK
1840: // begin on a non-persistent connection
1841: task = new RecoveryTask(transactionId, id, new BeginTask(1,
1842: 1, new TransactionMetaData(transactionId,
1843: (long) timeout * 1000, user, false, 0)),
1844: status);
1845: if (logger.isDebugEnabled())
1846: logger.debug("begin transaction: " + transactionId);
1847: } else if (sql.startsWith(BEGIN)) {
1848: // DO NOT SWITCH THIS BLOCK WITH THE PREVIOUS if BLOCK
1849: // begin on a persistent connection, extract the persistent connection id
1850: long persistentConnectionId = Long.parseLong(sql.substring(
1851: BEGIN.length()).trim());
1852: task = new RecoveryTask(transactionId, id, new BeginTask(1,
1853: 1, new TransactionMetaData(transactionId,
1854: (long) timeout * 1000, user, true,
1855: persistentConnectionId)), status);
1856: if (logger.isDebugEnabled())
1857: logger.debug("begin transaction: " + transactionId
1858: + " on persistent connection "
1859: + persistentConnectionId);
1860: } else if (COMMIT.equals(sql)) { // commit (we do not care about the persistent connection id here)
1861: task = new RecoveryTask(transactionId, id, new CommitTask(
1862: 1, 1, new TransactionMetaData(transactionId,
1863: (long) timeout * 1000, user, false, 0)),
1864: status);
1865: if (logger.isDebugEnabled())
1866: logger.debug("commit transaction: " + transactionId);
1867: } else if (ROLLBACK.equals(sql)) { // rollback (we do not care about the persistent connection id here)
1868: int index = sql.indexOf(' ');
1869: if (index == -1) {
1870: task = new RecoveryTask(transactionId, id,
1871: new RollbackTask(1, 1, new TransactionMetaData(
1872: transactionId, (long) timeout * 1000,
1873: user, false, 0)), status);
1874: if (logger.isDebugEnabled())
1875: logger.debug("rollback transaction: "
1876: + transactionId);
1877: } else { // Rollback to savepoint
1878: String savepointName = sql.substring(index).trim();
1879: task = new RecoveryTask(transactionId, id,
1880: new RollbackToSavepointTask(1, 1,
1881: new TransactionMetaData(transactionId,
1882: (long) timeout * 1000, user,
1883: false, 0), savepointName),
1884: status);
1885: if (logger.isDebugEnabled())
1886: logger.debug("rollback transaction to savepoint: "
1887: + transactionId);
1888: }
1889: } else if (sql.startsWith("savepoint ")) { // set savepoint (we do not care about the persistent connection id here)
1890: String savepointName = sql.substring(sql.indexOf(' '))
1891: .trim();
1892: task = new RecoveryTask(transactionId, id,
1893: new SavepointTask(1, 1, new TransactionMetaData(
1894: transactionId, (long) timeout * 1000, user,
1895: false, 0), savepointName), status);
1896: if (logger.isDebugEnabled())
1897: logger.debug("transaction set savepoint: "
1898: + transactionId);
1899: } else if (sql.startsWith("release ")) { // release savepoint (we do not care about the persistent connection id
1900: // here)
1901: String savepointName = sql.substring(sql.indexOf(' '));
1902: task = new RecoveryTask(transactionId, id,
1903: new ReleaseSavepointTask(1, 1,
1904: new TransactionMetaData(transactionId,
1905: (long) timeout * 1000, user, false,
1906: 0), savepointName), status);
1907: if (logger.isDebugEnabled())
1908: logger.debug("transaction release savepoint: "
1909: + transactionId);
1910: } else {
1911: // Use regular expressions to determine object to rebuild
1912: AbstractRequest decodedRequest = requestFactory
1913: .requestFromString(sql, false, escapeProcessing,
1914: timeout, "\n");
1915: if (decodedRequest != null) {
1916: setRequestParameters(logEntry, decodedRequest,
1917: scheduler);
1918: if (logger.isDebugEnabled())
1919: logger.debug("recovering "
1920: + decodedRequest.getType());
1921:
1922: if (decodedRequest instanceof AbstractWriteRequest) {
1923: task = new RecoveryTask(
1924: transactionId,
1925: id,
1926: new StatementExecuteUpdateTask(
1927: 1,
1928: 1,
1929: (AbstractWriteRequest) decodedRequest),
1930: status);
1931: } else if (decodedRequest instanceof StoredProcedure) {
1932: task = new RecoveryTask(transactionId, id,
1933: new CallableStatementExecuteTask(1, 1,
1934: (StoredProcedure) decodedRequest,
1935: null), status);
1936: } else {
1937: if (decodedRequest instanceof UnknownWriteRequest) {
1938: task = new RecoveryTask(
1939: transactionId,
1940: id,
1941: new StatementExecuteTask(
1942: 1,
1943: 1,
1944: (AbstractWriteRequest) decodedRequest,
1945: null), status);
1946: } else
1947: // read request
1948: {
1949: if (decodedRequest instanceof SelectRequest) {
1950: // Set a fake cursor name in all cases (addresses SEQUOIA-396)
1951: decodedRequest
1952: .setCursorName("replay_cursor");
1953: }
1954: /*
1955: * For unknown read requests, we do not care about "select for
1956: * update" pattern here since no broadcast will occur anyway
1957: */
1958: task = new RecoveryTask(transactionId, id,
1959: new StatementExecuteQueryTask(1, 1,
1960: (SelectRequest) decodedRequest,
1961: null), status);
1962: }
1963: }
1964: }
1965: }
1966: return task;
1967: }
1968:
1969: private void setRequestParameters(LogEntry entry,
1970: AbstractRequest decodedRequest, AbstractScheduler scheduler) {
1971: decodedRequest.setLogin(entry.getLogin());
1972: decodedRequest.setSqlOrTemplate(entry.getQuery());
1973: decodedRequest.setPreparedStatementParameters(entry
1974: .getQueryParams());
1975: if (LogEntry.TRANSACTION.equals(entry.getAutoConnTrans())) {
1976: decodedRequest.setIsAutoCommit(false);
1977: decodedRequest.setTransactionId(entry.getTid());
1978: decodedRequest
1979: .setTransactionIsolation(org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL);
1980: } else {
1981: decodedRequest.setIsAutoCommit(true);
1982: decodedRequest.setTransactionId(scheduler
1983: .getNextTransactionId());
1984: if (LogEntry.PERSISTENT_CONNECTION.equals(entry
1985: .getAutoConnTrans())) {
1986: decodedRequest.setPersistentConnection(true);
1987: decodedRequest
1988: .setPersistentConnectionId(entry.getTid());
1989: }
1990: }
1991: }
1992:
1993: //
1994: //
1995: // Checkpoint Management
1996: //
1997: //
1998:
1999: /**
2000: * Deletes recovery log entries that are older than specified checkpoint. The
2001: * log entry pointed to by 'checkpointName' is kept in the recovery log to
2002: * make sure that logId numbering is not reset in case of full clearance.
2003: * (upon vdb restart, member logTableId is initialized with max(log_id) found
2004: * in the database, see code in intializeDatabase()).
2005: *
2006: * @param checkpointName the name of the checkpoint uptil which log entries
2007: * should be removed
2008: * @throws SQLException in case of error.
2009: */
2010: public void deleteLogEntriesBeforeCheckpoint(String checkpointName)
2011: throws SQLException {
2012: checkIfShuttingDown();
2013:
2014: long id = getCheckpointLogId(checkpointName);
2015: loggerThread.log(new DeleteLogEntriesAndCheckpointBetweenEvent(
2016: -1, id));
2017: }
2018:
2019: /**
2020: * Returns a time-ordered odered (most recent first) array of names of all the
2021: * checkpoint available in the recovery log. <strong>this method may not
2022: * return the exact list of checkpoint names (since it is called concurrently
2023: * to events posted to the recovery log queue).</strong>
2024: *
2025: * @return a time-ordered odered (most recent first) <code>ArrayList</code>
2026: * of <code>String</code> checkpoint names
2027: * @throws SQLException if fails
2028: * @deprecated use getCheckpoint().values() instead
2029: */
2030: public ArrayList getCheckpointNames() throws SQLException {
2031: checkIfShuttingDown();
2032:
2033: PreparedStatement stmt = null;
2034:
2035: try {
2036: // Ordering is not quite newest first if two checkpoints happen to have
2037: // the same log_id. We sort by name to make improper ordering less
2038: // likely. TODO: Add timestamp or reformat checkpoint names so that they
2039: // sort by date.
2040: if (logger.isDebugEnabled())
2041: logger.debug("Retrieving checkpoint names list");
2042: stmt = getDatabaseConnection().prepareStatement(
2043: "SELECT name from " + checkpointTableName
2044: + " ORDER BY log_id DESC, name DESC");
2045: ResultSet rs = stmt.executeQuery();
2046: ArrayList list = new ArrayList();
2047: while (rs.next()) {
2048: list.add(rs.getString(1));
2049: }
2050: rs.close();
2051: return list;
2052: } catch (Exception e) {
2053: invalidateInternalConnection();
2054: throw new SQLException(Translate.get(
2055: "recovery.jdbc.checkpoint.list.failed", e));
2056: } finally {
2057: try {
2058: if (stmt != null)
2059: stmt.close();
2060: } catch (SQLException ignore) {
2061: }
2062: }
2063: }
2064:
2065: /**
2066: * Returns a <code>Map<String, String></code> of checkpoints where the
2067: * keys are the checkpoint names and the values are the corresponding log IDs.
2068: * The Map is orderered by log IDs (newest first).
2069: *
2070: * @return a <code>Map<String, String></code> of checkpoints (key=
2071: * checkpoint name, value = log id)
2072: * @throws SQLException if an error occurs while retrieving the checkpoints
2073: * information from the checkpoint table
2074: */
2075: Map/* <String, String> */getCheckpoints() throws SQLException {
2076: checkIfShuttingDown();
2077:
2078: PreparedStatement stmt = null;
2079:
2080: try {
2081: if (logger.isDebugEnabled())
2082: logger.debug("Retrieving checkpoint names list");
2083: stmt = getDatabaseConnection().prepareStatement(
2084: "SELECT log_id, name from " + checkpointTableName
2085: + " ORDER BY log_id DESC");
2086: ResultSet rs = stmt.executeQuery();
2087: Map checkpoints = new TreeMap();
2088: while (rs.next()) {
2089: String name = rs.getString("name");
2090: String id = rs.getString("log_id");
2091: checkpoints.put(name, id);
2092: }
2093: rs.close();
2094: return checkpoints;
2095: } catch (Exception e) {
2096: invalidateInternalConnection();
2097: throw new SQLException(Translate.get(
2098: "recovery.jdbc.checkpoint.list.failed", e));
2099: } finally {
2100: try {
2101: if (stmt != null)
2102: stmt.close();
2103: } catch (SQLException ignore) {
2104: }
2105: }
2106: }
2107:
2108: /**
2109: * Returns a list of checkpoint names (strings) that all represent the same
2110: * point in time (same log id) as specified 'checkpointName'. The list is
2111: * never null and includes the specified 'checkpointName'.
2112: *
2113: * @param checkpointName the checkpoint name
2114: * @return a list of checkpoint names (strings) that all represent the same
2115: * point in time (same log id)
2116: * @throws SQLException if no such checkpoint exists of if there is an error
2117: */
2118: public String[] getCheckpointNameAliases(String checkpointName)
2119: throws SQLException {
2120: long logId = getCheckpointLogId(checkpointName);
2121: Object ret = executeQuery("SELECT name from "
2122: + checkpointTableName + " where log_id=" + logId
2123: + " ORDER BY log_id DESC", new QueryHandler() {
2124: public Object run(ResultSet rs) throws SQLException {
2125: ArrayList aliases = new ArrayList();
2126: while (rs.next())
2127: aliases.add(rs.getString("name"));
2128: return aliases.toArray(new String[aliases.size()]);
2129: }
2130: });
2131: return (String[]) ret;
2132: }
2133:
2134: abstract class QueryHandler {
2135: /**
2136: * To be defined by query handler. This is supposed to contain the
2137: * processing of the result set that is returned by the query execution.
2138: *
2139: * @param rs the ResultSet returned by the query execution
2140: * @return an object containing the result of the processing of the handler.
2141: * @throws SQLException
2142: */
2143: abstract public Object run(ResultSet rs) throws SQLException;
2144: };
2145:
2146: Object executeQuery(String query, QueryHandler handler)
2147: throws SQLException {
2148: checkIfShuttingDown();
2149:
2150: PreparedStatement stmt = null;
2151: try {
2152: stmt = getDatabaseConnection().prepareStatement(query);
2153: ResultSet rs = stmt.executeQuery();
2154: return handler.run(rs);
2155: } catch (Exception e) {
2156: invalidateInternalConnection();
2157: throw new SQLException(Translate.get(
2158: "recovery.jdbc.query.failed", e));
2159: } finally {
2160: try {
2161: if (stmt != null)
2162: stmt.close(); // this also closes the resultSet
2163: } catch (SQLException ignore) {
2164: }
2165: }
2166: }
2167:
2168: /**
2169: * Get the log id corresponding to a given checkpoint. This is the first step
2170: * in a recovery process. Following steps consist in calling
2171: * recoverNextRequest.
2172: *
2173: * @param checkpointName Name of the checkpoint
2174: * @return long the log identifier corresponding to this checkpoint.
2175: * @exception SQLException if an error occurs or the checkpoint does not exist
2176: * @see #recoverNextRequest(long)
2177: */
2178: public long getCheckpointLogId(String checkpointName)
2179: throws SQLException {
2180: checkIfShuttingDown();
2181:
2182: GetCheckpointLogIdEvent event = new GetCheckpointLogIdEvent(
2183: getDatabaseConnection(), getCheckpointTableName(),
2184: checkpointName);
2185: synchronized (event) {
2186: loggerThread.log(event);
2187: try {
2188: event.wait();
2189: } catch (InterruptedException e) {
2190: throw new SQLException(
2191: "Interrupted while waiting for id of checkpoint "
2192: + checkpointName + " ("
2193: + event.getCatchedException() + ")");
2194: }
2195: }
2196: if (event.getCatchedException() != null)
2197: throw event.getCatchedException();
2198: return event.getCheckpointLogId();
2199: }
2200:
2201: /**
2202: * Get the log entry corresponding to a given checkpoint. This is used to set
2203: * a checkpoint globally cluster-wide using the unique log entry.
2204: *
2205: * @param checkpointName Name of the checkpoint
2206: * @return a CheckpointLogEntry corresponding to this checkpoint.
2207: * @exception SQLException if an error occurs or the checkpoint does not exist
2208: * @see #recoverNextRequest(long)
2209: */
2210: public CheckpointLogEntry getCheckpointLogEntry(
2211: String checkpointName) throws SQLException {
2212: checkIfShuttingDown();
2213:
2214: GetCheckpointLogEntryEvent event = new GetCheckpointLogEntryEvent(
2215: getDatabaseConnection(), this , checkpointName);
2216: synchronized (event) {
2217: loggerThread.log(event);
2218: try {
2219: event.wait();
2220: } catch (InterruptedException e) {
2221: throw new SQLException(
2222: "Interrupted while waiting for id of checkpoint "
2223: + checkpointName + " ("
2224: + event.getCatchedException() + ")");
2225: }
2226: }
2227: if (event.getCatchedException() != null)
2228: throw event.getCatchedException();
2229: return event.getCheckpointLogEntry();
2230: }
2231:
2232: /**
2233: * Shift the log entries from the given checkpoint id by the shift factor
2234: * provided. All entries with an id below nowCheckpointId and all the others
2235: * will have their id increased by shift.
2236: *
2237: * @param fromId id where to start the move
2238: * @param shift increment to add to log entries id
2239: */
2240: public void moveEntries(long fromId, long shift) {
2241: synchronized (this ) {
2242: // Move current entries in the database log
2243: loggerThread.log(new ShiftLogEntriesEvent(fromId, shift));
2244: // Move current id forward so that next entries do not have to be
2245: // relocated
2246: logTableId = logTableId + shift;
2247: }
2248: }
2249:
2250: /**
2251: * Delete the log entries from the fromId id to toId.
2252: *
2253: * @param fromId id where to start the move
2254: * @param toId increment to add to log entries id
2255: */
2256: public void deleteLogEntriesAndCheckpointBetween(long fromId,
2257: long toId) {
2258: synchronized (this ) {
2259: // Delete entries in the database log
2260: loggerThread
2261: .log(new DeleteLogEntriesAndCheckpointBetweenEvent(
2262: fromId, toId));
2263: }
2264: }
2265:
2266: /**
2267: * Remove a checkpoint from the recovery. This is useful for recovery
2268: * maintenance
2269: *
2270: * @param checkpointName to remove
2271: */
2272: public void removeCheckpoint(String checkpointName) {
2273: RemoveCheckpointEvent removeCheckpointEvent = new RemoveCheckpointEvent(
2274: checkpointName);
2275: synchronized (removeCheckpointEvent) {
2276: loggerThread.log(removeCheckpointEvent);
2277: try {
2278: removeCheckpointEvent.wait();
2279: } catch (InterruptedException ignore) {
2280: }
2281: }
2282: }
2283:
2284: /**
2285: * Store the state of the backend in the recovery log
2286: *
2287: * @param databaseName the virtual database name
2288: * @param backendRecoveryInfo the backend recovery information to store
2289: * @throws SQLException if cannot proceed
2290: */
2291: public void storeBackendRecoveryInfo(String databaseName,
2292: BackendRecoveryInfo backendRecoveryInfo)
2293: throws SQLException {
2294: checkIfShuttingDown();
2295:
2296: PreparedStatement stmt = null;
2297: PreparedStatement stmt2 = null;
2298: if ((backendRecoveryInfo.getCheckpoint() == null)
2299: || (backendRecoveryInfo.getCheckpoint().length() == 0)
2300: || (backendRecoveryInfo.getBackendState() != BackendState.DISABLED))
2301: backendRecoveryInfo.setCheckpoint(""); // No checkpoint
2302: else { // Check checkpoint name validity
2303: getCheckpointLogId(backendRecoveryInfo.getCheckpoint());
2304: }
2305:
2306: try {
2307: // 1. Get the reference point to delete
2308: stmt = getDatabaseConnection()
2309: .prepareStatement(
2310: "SELECT * FROM "
2311: + backendTableName
2312: + " WHERE backend_name LIKE ? and database_name LIKE ?");
2313: stmt.setString(1, backendRecoveryInfo.getBackendName());
2314: stmt.setString(2, databaseName);
2315: ResultSet rs = stmt.executeQuery();
2316: boolean mustUpdate = rs.next();
2317: rs.close();
2318: if (!mustUpdate) {
2319: stmt2 = getDatabaseConnection().prepareStatement(
2320: "INSERT INTO " + backendTableName
2321: + " values(?,?,?,?)");
2322: stmt2.setString(1, databaseName);
2323: stmt2
2324: .setString(2, backendRecoveryInfo
2325: .getBackendName());
2326: stmt2.setInt(3, backendRecoveryInfo.getBackendState());
2327: stmt2.setString(4, backendRecoveryInfo.getCheckpoint());
2328: if (stmt2.executeUpdate() != 1)
2329: throw new SQLException(
2330: "Error while inserting new backend reference. Incorrect number of rows");
2331: } else {
2332: stmt2 = getDatabaseConnection()
2333: .prepareStatement(
2334: "UPDATE "
2335: + backendTableName
2336: + " set backend_state=?,checkpoint_name=? where backend_name=? and database_name=?");
2337: stmt2.setInt(1, backendRecoveryInfo.getBackendState());
2338: stmt2.setString(2, backendRecoveryInfo.getCheckpoint());
2339: stmt2
2340: .setString(3, backendRecoveryInfo
2341: .getBackendName());
2342: stmt2.setString(4, databaseName);
2343: if (stmt2.executeUpdate() != 1)
2344: throw new SQLException(
2345: "Error while updating backend reference. Incorrect number of rows");
2346: }
2347: } catch (SQLException e) {
2348: invalidateInternalConnection();
2349:
2350: logger.warn("Failed to store backend recovery info", e);
2351:
2352: throw new SQLException("Unable to update checkpoint '"
2353: + backendRecoveryInfo.getCheckpoint()
2354: + "' for backend:"
2355: + backendRecoveryInfo.getBackendName());
2356: } finally {
2357: try {
2358: if (stmt != null)
2359: stmt.close();
2360: } catch (Exception ignore) {
2361: }
2362: try {
2363: if (stmt2 != null)
2364: stmt2.close();
2365: } catch (Exception ignore) {
2366: }
2367: }
2368: }
2369:
2370: /**
2371: * Store a Checkpoint using the current log state.
2372: *
2373: * @param checkpointName Name of the checkpoint
2374: * @exception SQLException if an error occurs
2375: */
2376: public void storeCheckpoint(String checkpointName)
2377: throws SQLException {
2378: // Check if a checkpoint with the name checkpointName already exists
2379: if (!validCheckpointName(checkpointName)) {
2380: throw new SQLException(Translate.get(
2381: "recovery.jdbc.checkpoint.duplicate",
2382: checkpointName));
2383: }
2384:
2385: loggerThread.log(new StoreCheckpointWithLogIdEvent(
2386: checkpointName, logTableId));
2387: }
2388:
2389: /**
2390: * Store a Checkpoint with the given log id.
2391: *
2392: * @param checkpointName Name of the checkpoint
2393: * @param logId log id this checkpoint points to
2394: * @exception SQLException if an error occurs
2395: */
2396: public void storeCheckpoint(String checkpointName, long logId)
2397: throws SQLException {
2398: // Check if a checkpoint with the name checkpointName already exists
2399: if (!validCheckpointName(checkpointName)) {
2400: throw new SQLException(Translate.get(
2401: "recovery.jdbc.checkpoint.duplicate",
2402: checkpointName));
2403: }
2404:
2405: loggerThread.log(new StoreCheckpointWithLogIdEvent(
2406: checkpointName, logId));
2407: }
2408:
2409: //
2410: //
2411: // Dump management
2412: //
2413: //
2414:
2415: /**
2416: * Get the DumpInfo element corresponding to the given dump name. Returns null
2417: * if no information is found for this dump name.
2418: *
2419: * @param dumpName the name of the dump to look for
2420: * @return a <code>DumpInfo</code> object or null if not found in the table
2421: * @throws SQLException if a recovery log database access error occurs
2422: */
2423: public DumpInfo getDumpInfo(String dumpName) throws SQLException {
2424: checkIfShuttingDown();
2425:
2426: PreparedStatement stmt = null;
2427:
2428: try {
2429: if (logger.isDebugEnabled())
2430: logger.debug("Retrieving dump " + dumpName
2431: + " information");
2432: stmt = getDatabaseConnection().prepareStatement(
2433: "SELECT * from " + dumpTableName
2434: + " WHERE dump_name LIKE ?");
2435: stmt.setString(1, dumpName);
2436:
2437: ResultSet rs = stmt.executeQuery();
2438: DumpInfo dumpInfo = null;
2439: if (rs.next()) {
2440: dumpInfo = new DumpInfo(rs.getString("dump_name"), rs
2441: .getTimestamp("dump_date"), rs
2442: .getString("dump_path"), rs
2443: .getString("dump_format"), rs
2444: .getString("checkpoint_name"), rs
2445: .getString("backend_name"), rs
2446: .getString(dumpTableTablesColumnName));
2447: }
2448: // else not found, return dumpInfo=null;
2449:
2450: rs.close();
2451: return dumpInfo;
2452: } catch (Exception e) {
2453: invalidateInternalConnection();
2454: throw new SQLException(Translate.get(
2455: "recovery.jdbc.dump.info.failed", new String[] {
2456: dumpName, e.getMessage() }));
2457: } finally {
2458: try {
2459: if (stmt != null)
2460: stmt.close();
2461: } catch (SQLException ignore) {
2462: }
2463: }
2464: }
2465:
2466: /**
2467: * Retrieve the list of available dumps.
2468: *
2469: * @return an <code>ArrayList</code> of <code>DumpInfo</code> objects
2470: * @throws SQLException if a recovery log database access error occurs
2471: */
2472: public ArrayList getDumpList() throws SQLException {
2473: checkIfShuttingDown();
2474:
2475: PreparedStatement stmt = null;
2476:
2477: try {
2478: if (logger.isDebugEnabled())
2479: logger.debug("Retrieving dump list");
2480: stmt = getDatabaseConnection().prepareStatement(
2481: "SELECT * FROM " + dumpTableName
2482: + " ORDER BY dump_date DESC");
2483: ResultSet rs = stmt.executeQuery();
2484: ArrayList list = new ArrayList();
2485: while (rs.next()) {
2486: list.add(new DumpInfo(rs.getString("dump_name"), rs
2487: .getTimestamp("dump_date"), rs
2488: .getString("dump_path"), rs
2489: .getString("dump_format"), rs
2490: .getString("checkpoint_name"), rs
2491: .getString("backend_name"), rs
2492: .getString(dumpTableTablesColumnName)));
2493: }
2494: rs.close();
2495: return list;
2496: } catch (Exception e) {
2497: invalidateInternalConnection();
2498: throw new SQLException(Translate.get(
2499: "recovery.jdbc.dump.list.failed", e));
2500: } finally {
2501: try {
2502: if (stmt != null)
2503: stmt.close();
2504: } catch (SQLException ignore) {
2505: }
2506: }
2507: }
2508:
2509: /**
2510: * Remove a dump information from the dump table base.
2511: *
2512: * @param dumpInfo the <code>DumpInfo</code> to remove
2513: * @throws SQLException if the dump has has not been removed from the dump
2514: * table
2515: */
2516: public void removeDump(DumpInfo dumpInfo) throws SQLException {
2517: checkIfShuttingDown();
2518:
2519: PreparedStatement stmt = null;
2520:
2521: try {
2522: if (logger.isDebugEnabled()) {
2523: logger.debug("removing dump " + dumpInfo.getDumpName());
2524: }
2525: stmt = getDatabaseConnection().prepareStatement(
2526: "DELETE FROM " + dumpTableName
2527: + " WHERE dump_name=?");
2528: stmt.setString(1, dumpInfo.getDumpName());
2529:
2530: stmt.executeUpdate();
2531: } catch (Exception e) {
2532: invalidateInternalConnection();
2533: throw new SQLException(Translate.get(
2534: "recovery.jdbc.dump.remove.failed", new String[] {
2535: dumpInfo.getDumpName(), e.getMessage() }));
2536: } finally {
2537: try {
2538: if (stmt != null)
2539: stmt.close();
2540: } catch (SQLException ignore) {
2541: }
2542: }
2543: }
2544:
2545: /**
2546: * Set DumpInfo, thereby making a new dump available for restore.
2547: *
2548: * @param dumpInfo the dump info to create.
2549: * @throws VirtualDatabaseException if an error occurs
2550: */
2551: public void setDumpInfo(DumpInfo dumpInfo)
2552: throws VirtualDatabaseException {
2553: try {
2554: storeDump(dumpInfo);
2555: } catch (SQLException e) {
2556: throw new VirtualDatabaseException(e);
2557: }
2558: }
2559:
2560: /**
2561: * Store the given dump information in the dump table
2562: *
2563: * @param dump the <code>DumpInfo</code> to store
2564: * @throws SQLException if a recovery log database access error occurs
2565: */
2566: public void storeDump(DumpInfo dump) throws SQLException {
2567: checkIfShuttingDown();
2568:
2569: PreparedStatement stmt = null;
2570:
2571: if (dump == null)
2572: throw new NullPointerException(
2573: "Invalid null dump in JDBCRecoverylog.storeDump");
2574:
2575: try {
2576: if (logger.isDebugEnabled())
2577: logger.debug("Storing dump " + dump.getDumpName());
2578: stmt = getDatabaseConnection().prepareStatement(
2579: "INSERT INTO " + dumpTableName
2580: + " VALUES (?,?,?,?,?,?,?)");
2581: stmt.setString(1, dump.getDumpName());
2582: stmt.setTimestamp(2, new Timestamp(dump.getDumpDate()
2583: .getTime()));
2584: stmt.setString(3, dump.getDumpPath());
2585: stmt.setString(4, dump.getDumpFormat());
2586: stmt.setString(5, dump.getCheckpointName());
2587: stmt.setString(6, dump.getBackendName());
2588: stmt.setString(7, dump.getTables());
2589:
2590: stmt.executeUpdate();
2591: } catch (Exception e) {
2592: invalidateInternalConnection();
2593: throw new SQLException(Translate.get(
2594: "recovery.jdbc.dump.store.failed", new String[] {
2595: dump.getDumpName(), e.getMessage() }));
2596: } finally {
2597: try {
2598: if (stmt != null)
2599: stmt.close();
2600: } catch (SQLException ignore) {
2601: }
2602: }
2603: }
2604:
2605: /**
2606: * @see StoreCheckpointWithLogIdEvent
2607: * @param dumpCheckpointName name of the checkpoint to store
2608: * @param checkpointId id of the checkpoint
2609: */
2610: public void storeDumpCheckpointName(String dumpCheckpointName,
2611: long checkpointId) {
2612: loggerThread.log(new StoreCheckpointWithLogIdEvent(
2613: dumpCheckpointName, checkpointId));
2614: }
2615:
2616: /**
2617: * Updates specified DumpTable entry column with specified value.
2618: *
2619: * @param dumpName the name of the dump record to update
2620: * @param columnName the name of the column to update
2621: * @param value the value to set
2622: * @throws SQLException if any error occurs (invalid dump name, sql query
2623: * syntax errors, hsqldb access errors, ...)
2624: */
2625: void updateDumpTableColumn(String dumpName, String columnName,
2626: String value) throws SQLException {
2627: checkIfShuttingDown();
2628:
2629: DumpInfo dumpInfo = getDumpInfo(dumpName);
2630: if (dumpInfo == null)
2631: throw new SQLException("No such dump name: " + dumpName);
2632:
2633: PreparedStatement stmt = null;
2634: int updateCount = 0;
2635: try {
2636: if (logger.isDebugEnabled())
2637: logger.debug("Updating '" + columnName + "' for dump '"
2638: + dumpInfo.getDumpName() + "' to " + value);
2639: stmt = getDatabaseConnection().prepareStatement(
2640: "UPDATE " + dumpTableName + " SET " + columnName
2641: + "=? WHERE dump_name=?");
2642: stmt.setString(1, value);
2643: stmt.setString(2, dumpName);
2644: updateCount = stmt.executeUpdate();
2645: } catch (Exception e) {
2646: invalidateInternalConnection();
2647: throw new SQLException(Translate.get(
2648: "recovery.jdbc.dump.update.column.failed",
2649: new String[] { dumpName, e.getMessage() }));
2650: } finally {
2651: try {
2652: if (stmt != null)
2653: stmt.close();
2654: } catch (SQLException ignore) {
2655: }
2656: }
2657:
2658: if (updateCount != 1) {
2659: String msg = "Invalid update count after dumpTable update.";
2660: logger.error(msg);
2661: throw new SQLException(msg);
2662: }
2663: }
2664:
2665: /**
2666: * Update the path name for a given checkpoint.
2667: *
2668: * @param dumpName the dump name
2669: * @param newPath the new path to set
2670: * @throws SQLException if a recovery log database access error occurs
2671: */
2672: public void updateDumpPath(String dumpName, String newPath)
2673: throws SQLException {
2674: updateDumpTableColumn(dumpName, "dump_path", newPath);
2675: }
2676:
2677: /**
2678: * Update the checkpoint name for specified dump, making it available for
2679: * restore operations.
2680: *
2681: * @param dumpName the dump name
2682: * @param checkpointName the new chekpoint to set
2683: * @throws SQLException if a recovery log database access error occurs
2684: */
2685: public void updateDumpCheckpoint(String dumpName,
2686: String checkpointName) throws SQLException {
2687: updateDumpTableColumn(dumpName, "checkpoint_name",
2688: checkpointName);
2689: }
2690:
2691: //
2692: //
2693: // Recovery log database tables management
2694: //
2695: //
2696:
2697: /**
2698: * Checks if the recovery log and checkpoint tables exist, and create them if
2699: * they do not exist. This method also starts the logger thread.
2700: */
2701: public void checkRecoveryLogTables() {
2702: try {
2703: intializeDatabase();
2704: } catch (SQLException e) {
2705: throw new RuntimeException(
2706: "Unable to initialize the database: " + e);
2707: }
2708:
2709: // Start the logger thread
2710: loggerThread = new LoggerThread(this );
2711: loggerThread.start();
2712: }
2713:
2714: /**
2715: * Returns the backendTableName value.
2716: *
2717: * @return Returns the backendTableName.
2718: */
2719: public String getBackendTableName() {
2720: return backendTableName;
2721: }
2722:
2723: /**
2724: * Returns the checkpointTableName value.
2725: *
2726: * @return Returns the checkpointTableName.
2727: */
2728: public String getCheckpointTableName() {
2729: return checkpointTableName;
2730: }
2731:
2732: /**
2733: * Returns the logTableName value.
2734: *
2735: * @return Returns the logTableName.
2736: */
2737: public String getLogTableName() {
2738: return logTableName;
2739: }
2740:
2741: /**
2742: * Returns the logTableSqlColumnName value.
2743: *
2744: * @return Returns the logTableSqlColumnName.
2745: */
2746: public String getLogTableSqlColumnName() {
2747: return logTableSqlColumnName;
2748: }
2749:
2750: /**
2751: * Sets the backend table create statement
2752: *
2753: * @param createTable statement to create the table
2754: * @param tableName the backend table name
2755: * @param checkpointNameType type for the checkpointName column
2756: * @param backendNameType type for the backendName column
2757: * @param backendStateType type for the backendState column
2758: * @param databaseNameType type for the databaseName column
2759: * @param extraStatement like primary keys
2760: */
2761: public void setBackendTableCreateStatement(String createTable,
2762: String tableName, String checkpointNameType,
2763: String backendNameType, String backendStateType,
2764: String databaseNameType, String extraStatement) {
2765: this .backendTableCreateTable = createTable;
2766: this .backendTableName = tableName;
2767: this .backendTableDatabaseName = databaseNameType;
2768: this .backendTableBackendName = backendNameType;
2769: this .backendTableBackendState = backendStateType;
2770: this .backendTableCheckpointName = checkpointNameType;
2771: this .backendTableExtraStatement = extraStatement;
2772: this .backendTableCreateStatement = createTable + " "
2773: + backendTableName + " (database_name "
2774: + databaseNameType + ", backend_name "
2775: + backendNameType + ",backend_state "
2776: + backendStateType + ", checkpoint_name "
2777: + checkpointNameType + " " + extraStatement + ")";
2778:
2779: if (logger.isDebugEnabled())
2780: logger.debug(Translate.get(
2781: "recovery.jdbc.backendtable.statement",
2782: backendTableCreateStatement));
2783: }
2784:
2785: /**
2786: * Sets the checkpoint table name and create statement.
2787: *
2788: * @param createTable statement to create the table
2789: * @param tableName name of the checkpoint table.
2790: * @param nameType type for the name column
2791: * @param logIdType type for the log_id column
2792: * @param extraStatement like primary keys
2793: */
2794: public void setCheckpointTableCreateStatement(String createTable,
2795: String tableName, String nameType, String logIdType,
2796: String extraStatement) {
2797: this .checkpointTableCreateTable = createTable;
2798: this .checkpointTableName = tableName;
2799: this .checkpointTableNameType = nameType;
2800: this .checkpointTableLogIdType = logIdType;
2801: this .checkpointTableExtraStatement = extraStatement;
2802: // CREATE TABLE tableName (
2803: // name checkpointNameColumnType,
2804: // log_id logIdColumnType,
2805: // extraStatement)
2806:
2807: checkpointTableCreateStatement = createTable + " " + tableName
2808: + " (name " + nameType + ",log_id " + logIdType
2809: + extraStatement + ")";
2810: if (logger.isDebugEnabled())
2811: logger.debug(Translate.get(
2812: "recovery.jdbc.checkpointtable.statement",
2813: checkpointTableCreateStatement));
2814: }
2815:
2816: /**
2817: * Sets the dump table name and create statement.
2818: *
2819: * @param createTable statement to create the table
2820: * @param tableName name of the checkpoint table.
2821: * @param dumpNameColumnType the dump name column type
2822: * @param dumpDateColumnType the dump data column type
2823: * @param dumpPathColumnType the dump path column type
2824: * @param dumpFormatColumnType the dump tpe column type
2825: * @param checkpointNameColumnType the checkpoint name column type
2826: * @param backendNameColumnType the backend name column type
2827: * @param tablesColumnName the database tables column name
2828: * @param tablesColumnType the database tables column type
2829: * @param extraStatement like primary keys
2830: */
2831: public void setDumpTableCreateStatement(String createTable,
2832: String tableName, String dumpNameColumnType,
2833: String dumpDateColumnType, String dumpPathColumnType,
2834: String dumpFormatColumnType,
2835: String checkpointNameColumnType,
2836: String backendNameColumnType, String tablesColumnName,
2837: String tablesColumnType, String extraStatement) {
2838: this .dumpTableCreateTable = createTable;
2839: this .dumpTableName = tableName;
2840: this .dumpTableDumpNameColumnType = dumpNameColumnType;
2841: this .dumpTableDumpDateColumnType = dumpDateColumnType;
2842: this .dumpTableDumpPathColumnType = dumpPathColumnType;
2843: this .dumpTableDumpFormatColumnType = dumpFormatColumnType;
2844: this .dumpTableCheckpointNameColumnType = checkpointNameColumnType;
2845: this .dumpTableBackendNameColumnType = backendNameColumnType;
2846: this .dumpTableTablesColumnName = tablesColumnName;
2847: this .dumpTableTablesColumnType = tablesColumnType;
2848: this .dumpTableExtraStatementDefinition = extraStatement;
2849:
2850: // CREATE TABLE DumpTable (
2851: // dump_name TEXT NOT NULL,
2852: // dump_date DATE,
2853: // dump_path TEXT NOT NULL,
2854: // dump_type TEXT NOT NULL,
2855: // checkpoint_name TEXT NOT NULL,
2856: // backend_name TEXT NOT NULL,
2857: // tables TEXT NOT NULL
2858: // )
2859:
2860: dumpTableCreateStatement = dumpTableCreateTable + " "
2861: + dumpTableName + " (dump_name "
2862: + dumpTableDumpNameColumnType + ",dump_date "
2863: + dumpDateColumnType + ",dump_path "
2864: + dumpPathColumnType + ",dump_format "
2865: + dumpFormatColumnType + ",checkpoint_name "
2866: + checkpointNameColumnType + ",backend_name "
2867: + backendNameColumnType + ","
2868: + dumpTableTablesColumnName + " " + tablesColumnType
2869: + extraStatement + ")";
2870: if (logger.isDebugEnabled())
2871: logger.debug(Translate.get(
2872: "recovery.jdbc.dumptable.statement",
2873: dumpTableCreateStatement));
2874: }
2875:
2876: /**
2877: * Sets the log table name and create statement.
2878: *
2879: * @param createTable statement to create the table
2880: * @param tableName name of the log table
2881: * @param idType type of the id column
2882: * @param vloginType type of the login column
2883: * @param sqlName name of the sql statement column
2884: * @param sqlType type of the sql column
2885: * @param sqlParamsType type of the sql_param column
2886: * @param autoConnTranColumnType type of the auto_conn_tran column
2887: * @param transactionIdType type of the transaction column
2888: * @param extraStatement extra statement like primary keys ...
2889: * @param requestIdType request id column type
2890: * @param execTimeType execution time in ms column type
2891: * @param updateCountType update count column type
2892: */
2893: public void setLogTableCreateStatement(String createTable,
2894: String tableName, String idType, String vloginType,
2895: String sqlName, String sqlType, String sqlParamsType,
2896: String autoConnTranColumnType, String transactionIdType,
2897: String requestIdType, String execTimeType,
2898: String updateCountType, String extraStatement) {
2899: this .logTableCreateTable = createTable;
2900: this .logTableName = tableName;
2901: this .logTableLogIdType = idType;
2902: this .logTableVloginType = vloginType;
2903: this .logTableSqlColumnName = sqlName;
2904: this .logTableSqlType = sqlType;
2905: this .logTableAutoConnTranColumnType = autoConnTranColumnType;
2906: this .logTableTransactionIdType = transactionIdType;
2907: this .logTableRequestIdType = requestIdType;
2908: this .logTableExecTimeType = execTimeType;
2909: this .logTableUpdateCountType = updateCountType;
2910: this .logTableExtraStatement = extraStatement;
2911: logTableCreateStatement = createTable + " " + tableName
2912: + " (log_id " + idType + ",vlogin " + vloginType + ","
2913: + logTableSqlColumnName + " " + sqlType + ","
2914: + logTableSqlColumnName + "_param " + sqlParamsType
2915: + ",auto_conn_tran " + autoConnTranColumnType
2916: + ",transaction_id " + transactionIdType
2917: + ",request_id " + requestIdType + ",exec_status "
2918: + autoConnTranColumnType + ",exec_time " + execTimeType
2919: + ",update_count " + updateCountType + extraStatement
2920: + ")";
2921: if (logger.isDebugEnabled())
2922: logger.debug(Translate.get(
2923: "recovery.jdbc.logtable.statement",
2924: logTableCreateStatement));
2925: }
2926:
2927: //
2928: //
2929: // Log utility functions
2930: //
2931: //
2932:
2933: /**
2934: * Checks if a checkpoint with the name checkpointName is already stored in
2935: * the database.
2936: *
2937: * @param checkpointName name of the checkpoint.
2938: * @return true if no checkpoint was found.
2939: */
2940: private boolean validCheckpointName(String checkpointName)
2941: throws SQLException {
2942: PreparedStatement stmt = null;
2943: ResultSet rs = null;
2944: try {
2945: stmt = getDatabaseConnection().prepareStatement(
2946: "SELECT * FROM " + checkpointTableName
2947: + " WHERE name LIKE ?");
2948: stmt.setString(1, checkpointName);
2949: rs = stmt.executeQuery();
2950:
2951: // If the query returned any rows, the checkpoint name is already
2952: // in use and therefore invalid.
2953: boolean checkpointExists = rs.next();
2954: rs.close();
2955: return !checkpointExists;
2956: } catch (SQLException e) {
2957: invalidateInternalConnection();
2958: throw new SQLException(Translate.get(
2959: "recovery.jdbc.checkpoint.check.failed", e));
2960: } finally {
2961: try {
2962: if (stmt != null)
2963: stmt.close();
2964: } catch (SQLException ignore) {
2965: }
2966: }
2967: }
2968:
2969: //
2970: //
2971: // Info/Monitoring/Debug related functions
2972: //
2973: //
2974:
2975: /**
2976: * @see org.continuent.sequoia.controller.jmx.AbstractStandardMBean#getAssociatedString()
2977: */
2978: public String getAssociatedString() {
2979: return "jdbcrecoverylog";
2980: }
2981:
2982: /**
2983: * Allow to get the content of the recovery log for viewing
2984: *
2985: * @return <code>String[][]</code>
2986: * @see org.continuent.sequoia.controller.monitoring.datacollector.DataCollector#retrieveRecoveryLogData(String)
2987: */
2988: public String[][] getData() {
2989: Statement stmt = null;
2990: ResultSet rs = null;
2991: try {
2992: stmt = getDatabaseConnection().createStatement();
2993: rs = stmt.executeQuery("select * from " + logTableName);
2994: ArrayList list = new ArrayList();
2995: while (rs.next()) {
2996: // 3: Query 2: User 1: ID 4: TID
2997: list.add(new String[] { rs.getString(3),
2998: rs.getString(2), rs.getString(1),
2999: rs.getString(4) });
3000: }
3001: String[][] result = new String[list.size()][4];
3002: for (int i = 0; i < list.size(); i++)
3003: result[i] = (String[]) list.get(i);
3004: return result;
3005: } catch (SQLException e) {
3006: return null;
3007: } finally {
3008: try {
3009: rs.close();
3010: } catch (SQLException ignore) {
3011: }
3012: try {
3013: stmt.close();
3014: } catch (SQLException ignore) {
3015: }
3016: }
3017: }
3018:
3019: String[] getColumnNames() {
3020: Statement stmt = null;
3021: ResultSet rs = null;
3022: try {
3023: stmt = getDatabaseConnection().createStatement();
3024: rs = stmt.executeQuery("select * from " + logTableName
3025: + " where log_id=0");
3026: int columnCount = rs.getMetaData().getColumnCount();
3027: String[] columnNames = new String[columnCount];
3028: for (int i = 0; i < columnCount; i++) {
3029: columnNames[i] = rs.getMetaData().getColumnName(i + 1);
3030: }
3031: return columnNames;
3032: } catch (SQLException e) {
3033: return new String[0];
3034: } finally {
3035: try {
3036: rs.close();
3037: } catch (SQLException ignore) {
3038: }
3039: try {
3040: stmt.close();
3041: } catch (SQLException ignore) {
3042: }
3043: }
3044: }
3045:
3046: /**
3047: * returns an long[2] for min and max log_id in the log table
3048: *
3049: * @return an long[2] for min and max log_id in the log table
3050: * @throws SQLException if an error occurs while computing the index of the
3051: * log table
3052: */
3053: long[] getIndexes() throws SQLException {
3054: Statement stmt;
3055: stmt = getDatabaseConnection().createStatement();
3056: ResultSet rs = null;
3057: try {
3058: rs = stmt
3059: .executeQuery("select min(log_id),max(log_id) from "
3060: + logTableName);
3061: rs.next();
3062: long min = rs.getLong(1);
3063: long max = rs.getLong(2);
3064: return new long[] { min, max };
3065: } finally {
3066: if (rs != null) {
3067: rs.close();
3068: }
3069: if (stmt != null) {
3070: stmt.close();
3071: }
3072: }
3073: }
3074:
3075: /**
3076: * Exposes the contents of the recovery log table as a matrix of String.<br />
3077: * <em>this method should be only used for management/debugging purpose</em>
3078: *
3079: * @param from the starting index from which the log entries are retrieved
3080: * (corresponds to the log_id column)
3081: * @param maxrows the maximum number of rows to retrieve
3082: * @return a matrix of String representing the content of the recovery log
3083: * table
3084: */
3085: String[][] getLogEntries(long from, int maxrows) {
3086: Statement stmt = null;
3087: ResultSet rs = null;
3088: try {
3089: stmt = getDatabaseConnection().createStatement();
3090: stmt.setMaxRows(maxrows);
3091: rs = stmt.executeQuery("select * from " + logTableName
3092: + " where log_id >= " + from);
3093: int columnCount = rs.getMetaData().getColumnCount();
3094: List logEntries = new ArrayList();
3095: while (rs.next()) {
3096: String[] logEntry = new String[columnCount];
3097: for (int i = 0; i < columnCount; i++) {
3098: logEntry[i] = rs.getString(i + 1);
3099: }
3100: logEntries.add(logEntry);
3101: }
3102: String[][] result = new String[logEntries.size()][columnCount];
3103: for (int i = 0; i < logEntries.size(); i++)
3104: result[i] = (String[]) logEntries.get(i);
3105: return result;
3106: } catch (SQLException e) {
3107: return null;
3108: } finally {
3109: try {
3110: rs.close();
3111: } catch (SQLException ignore) {
3112: }
3113: try {
3114: stmt.close();
3115: } catch (SQLException ignore) {
3116: }
3117: }
3118: }
3119:
3120: String[][] getLogEntriesInTransaction(long tid) {
3121: Statement stmt = null;
3122: ResultSet rs = null;
3123: try {
3124: stmt = getDatabaseConnection().createStatement();
3125: rs = stmt.executeQuery("select * from " + logTableName
3126: + " where transaction_id = " + tid);
3127: int columnCount = rs.getMetaData().getColumnCount();
3128: List logEntries = new ArrayList();
3129: while (rs.next()) {
3130: String[] logEntry = new String[columnCount];
3131: for (int i = 0; i < columnCount; i++) {
3132: logEntry[i] = rs.getString(i + 1);
3133: }
3134: logEntries.add(logEntry);
3135: }
3136: String[][] result = new String[logEntries.size()][columnCount];
3137: for (int i = 0; i < logEntries.size(); i++)
3138: result[i] = (String[]) logEntries.get(i);
3139: return result;
3140: } catch (SQLException e) {
3141: return null;
3142: } finally {
3143: try {
3144: rs.close();
3145: } catch (SQLException ignore) {
3146: }
3147: try {
3148: stmt.close();
3149: } catch (SQLException ignore) {
3150: }
3151: }
3152: }
3153:
3154: /**
3155: * Gives the log as an XML String
3156: *
3157: * @return the XML representation of the log
3158: */
3159: public String getXml() {
3160: StringBuffer info = new StringBuffer();
3161: info.append("<" + DatabasesXmlTags.ELT_RecoveryLog + " "
3162: + DatabasesXmlTags.ATT_driver + "=\"" + driverClassName
3163: + "\" " + DatabasesXmlTags.ATT_url + "=\"" + url
3164: + "\" ");
3165: if (driverName != null) {
3166: info.append(DatabasesXmlTags.ATT_driverPath + "=\""
3167: + driverName + "\" ");
3168: }
3169: info.append(DatabasesXmlTags.ATT_login + "=\"" + login + "\" "
3170: + DatabasesXmlTags.ATT_password + "=\"" + password
3171: + "\" " + DatabasesXmlTags.ATT_requestTimeout + "=\""
3172: + (timeout / 1000) + "\" "
3173: + DatabasesXmlTags.ATT_recoveryBatchSize + "=\""
3174: + recoveryBatchSize + "\">");
3175: // Recovery Log table
3176: info.append("<" + DatabasesXmlTags.ELT_RecoveryLogTable + " "
3177: + DatabasesXmlTags.ATT_createTable + "=\""
3178: + logTableCreateTable + "\" "
3179: + DatabasesXmlTags.ATT_tableName + "=\"" + logTableName
3180: + "\" " + DatabasesXmlTags.ATT_logIdColumnType + "=\""
3181: + logTableLogIdType + "\" "
3182: + DatabasesXmlTags.ATT_vloginColumnType + "=\""
3183: + logTableVloginType + "\" "
3184: + DatabasesXmlTags.ATT_sqlColumnType + "=\""
3185: + logTableSqlType + "\" "
3186: + DatabasesXmlTags.ATT_autoConnTranColumnType + "=\""
3187: + logTableAutoConnTranColumnType + "\" "
3188: + DatabasesXmlTags.ATT_transactionIdColumnType + "=\""
3189: + logTableTransactionIdType + "\" "
3190: + DatabasesXmlTags.ATT_requestIdColumnType + "=\""
3191: + logTableRequestIdType + "\" "
3192: + DatabasesXmlTags.ATT_execTimeColumnType + "=\""
3193: + logTableExecTimeType + "\" "
3194: + DatabasesXmlTags.ATT_updateCountColumnType + "=\""
3195: + logTableUpdateCountType + "\" "
3196: + DatabasesXmlTags.ATT_extraStatementDefinition + "=\""
3197: + logTableExtraStatement + "\"/>");
3198: // Checkpoint table
3199: info.append("<" + DatabasesXmlTags.ELT_CheckpointTable + " "
3200: + DatabasesXmlTags.ATT_createTable + "=\""
3201: + checkpointTableCreateTable + "\" "
3202: + DatabasesXmlTags.ATT_tableName + "=\""
3203: + checkpointTableName + "\" "
3204: + DatabasesXmlTags.ATT_checkpointNameColumnType + "=\""
3205: + checkpointTableNameType + "\" "
3206: + DatabasesXmlTags.ATT_logIdColumnType + "=\""
3207: + checkpointTableLogIdType + "\" "
3208: + DatabasesXmlTags.ATT_extraStatementDefinition + "=\""
3209: + checkpointTableExtraStatement + "\"" + "/>");
3210: // BackendLog table
3211: info.append("<" + DatabasesXmlTags.ELT_BackendTable + " "
3212: + DatabasesXmlTags.ATT_createTable + "=\""
3213: + backendTableCreateTable + "\" "
3214: + DatabasesXmlTags.ATT_tableName + "=\""
3215: + backendTableName + "\" "
3216: + DatabasesXmlTags.ATT_databaseNameColumnType + "=\""
3217: + backendTableDatabaseName + "\" "
3218: + DatabasesXmlTags.ATT_backendNameColumnType + "=\""
3219: + backendTableBackendName + "\" "
3220: + DatabasesXmlTags.ATT_backendStateColumnType + "=\""
3221: + backendTableBackendState + "\" "
3222: + DatabasesXmlTags.ATT_checkpointNameColumnType + "=\""
3223: + backendTableCheckpointName + "\" "
3224: + DatabasesXmlTags.ATT_extraStatementDefinition + "=\""
3225: + backendTableExtraStatement + "\"" + "/>");
3226: // Dump table
3227: info.append("<" + DatabasesXmlTags.ELT_DumpTable + " "
3228: + DatabasesXmlTags.ATT_createTable + "=\""
3229: + dumpTableCreateTable + "\" "
3230: + DatabasesXmlTags.ATT_tableName + "=\""
3231: + dumpTableName + "\" "
3232: + DatabasesXmlTags.ATT_dumpNameColumnType + "=\""
3233: + dumpTableDumpNameColumnType + "\" "
3234: + DatabasesXmlTags.ATT_dumpDateColumnType + "=\""
3235: + dumpTableDumpDateColumnType + "\" "
3236: + DatabasesXmlTags.ATT_dumpPathColumnType + "=\""
3237: + dumpTableDumpPathColumnType + "\" "
3238: + DatabasesXmlTags.ATT_dumpFormatColumnType + "=\""
3239: + dumpTableDumpFormatColumnType + "\" "
3240: + DatabasesXmlTags.ATT_checkpointNameColumnType + "=\""
3241: + dumpTableCheckpointNameColumnType + "\" "
3242: + DatabasesXmlTags.ATT_backendNameColumnType + "=\""
3243: + dumpTableBackendNameColumnType + "\" "
3244: + DatabasesXmlTags.ATT_tablesColumnName + "=\""
3245: + dumpTableTablesColumnName + "\" "
3246: + DatabasesXmlTags.ATT_tablesColumnType + "=\""
3247: + dumpTableTablesColumnType + "\" "
3248: + DatabasesXmlTags.ATT_extraStatementDefinition + "=\""
3249: + dumpTableExtraStatementDefinition + "\"" + "/>");
3250: info.append("</" + DatabasesXmlTags.ELT_RecoveryLog + ">");
3251:
3252: return info.toString();
3253: }
3254:
3255: //
3256: // VDB restart order management
3257: //
3258:
3259: public boolean isLastManDown() throws SQLException {
3260: ResultSet rs = getDatabaseConnection().createStatement()
3261: .executeQuery(
3262: "select * from " + checkpointTableName
3263: + " where name ='last-man-down'");
3264: return rs.next();
3265: }
3266:
3267: public void setLastManDown() throws SQLException {
3268: getDatabaseConnection().createStatement().execute(
3269: "insert into " + checkpointTableName
3270: + " values ('last-man-down', 0)");
3271: }
3272:
3273: public void clearLastManDown() throws SQLException {
3274: getDatabaseConnection().createStatement().execute(
3275: "delete from " + checkpointTableName
3276: + " where name='last-man-down'");
3277: }
3278: }
|