001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): ______________________.
022: */package org.continuent.sequoia.controller.recoverylog;
023:
024: import java.sql.PreparedStatement;
025: import java.sql.ResultSet;
026: import java.sql.SQLException;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029:
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.log.Trace;
032: import org.continuent.sequoia.controller.recoverylog.events.LogEvent;
033: import org.continuent.sequoia.controller.recoverylog.events.LogRequestEvent;
034:
035: /**
036: * Logger thread for the RecoveryLog.
037: *
038: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
039: * @version 1.0
040: */
041: public class LoggerThread extends Thread {
042: private boolean killed = false; // Control thread death
043: /*
044: * the only place where we must remove the first element of the queue is in
045: * the run() method. do note remove its first element anywhere else!
046: */
047: private LinkedList logQueue;
048: private Trace logger;
049: private PreparedStatement logStmt;
050: private PreparedStatement updateStmt;
051: private RecoveryLog recoveryLog;
052: private LogEvent currentEvent = null;
053: private LogEvent lastFailed;
054:
055: /**
056: * Creates a new <code>LoggerThread</code> object
057: *
058: * @param log the RecoveryLog that instanciates this thread
059: */
060: public LoggerThread(RecoveryLog log) {
061: super ("LoggerThread");
062: this .recoveryLog = log;
063: this .logger = RecoveryLog.logger;
064: logStmt = null;
065: updateStmt = null;
066: logQueue = new LinkedList();
067: }
068:
069: /**
070: * Returns the logger value.
071: *
072: * @return Returns the logger.
073: */
074: public Trace getLogger() {
075: return logger;
076: }
077:
078: /**
079: * Tells whether there are pending logs
080: *
081: * @return true if no more jobs in the log queue
082: */
083: public synchronized boolean getLogQueueIsEmpty() {
084: if (logQueue.isEmpty()) {
085: // Notifies the Recovery log that the queue is empty.
086: notify();
087: return true;
088: } else {
089: return false;
090: }
091: }
092:
093: /**
094: * Return a PreparedStatement to log an entry as follows:
095: * <p>
096: * INSERT INTO LogTableName VALUES(?,?,?,?,?,?,?)
097: *
098: * @return a PreparedStatement
099: * @throws SQLException if an error occurs
100: */
101: public PreparedStatement getLogPreparedStatement()
102: throws SQLException {
103: if (logStmt == null) {
104: logStmt = recoveryLog.getDatabaseConnection()
105: .prepareStatement(
106: "INSERT INTO "
107: + recoveryLog.getLogTableName()
108: + " VALUES(?,?,?,?,?,?,?,?,?,?)");
109: }
110: return logStmt;
111: }
112:
113: /**
114: * Return a PreparedStatement to update an entry as follows:
115: * <p>
116: * UPDATE LogTableName SET exec_status=?,exec_time=? WHERE log_id=?
117: *
118: * @return a PreparedStatement
119: * @throws SQLException if an error occurs
120: */
121: public PreparedStatement getUpdatePreparedStatement()
122: throws SQLException {
123: if (updateStmt == null) {
124: updateStmt = recoveryLog
125: .getDatabaseConnection()
126: .prepareStatement(
127: "UPDATE "
128: + recoveryLog.getLogTableName()
129: + " SET exec_status=?,update_count=?,exec_time=? WHERE log_id=?");
130: }
131: return updateStmt;
132: }
133:
134: /**
135: * Returns the recoveryLog value.
136: *
137: * @return Returns the recoveryLog.
138: */
139: public RecoveryLog getRecoveryLog() {
140: return recoveryLog;
141: }
142:
143: /**
144: * Returns true if there is any log event in the queue that belongs to the
145: * given transaction.
146: *
147: * @param tid transaction id to look for
148: * @return true if a log entry belongs to this transaction
149: */
150: public synchronized boolean hasLogEntryForTransaction(long tid) {
151: for (Iterator iter = logQueue.iterator(); iter.hasNext();) {
152: LogEvent logEvent = (LogEvent) iter.next();
153: if (logEvent.belongToTransaction(tid))
154: return true;
155: }
156: return false;
157: }
158:
159: /**
160: * Invalidate both logStmt and unlogStmt so that they can be renewed from a
161: * fresh connection.
162: *
163: * @see #getLogPreparedStatement()
164: * @see #getUpdatePreparedStatement()
165: */
166: public void invalidateLogStatements() {
167: try {
168: logStmt.close();
169: } catch (Exception ignore) {
170: }
171: try {
172: updateStmt.close();
173: } catch (Exception ignore) {
174: }
175: logStmt = null;
176: updateStmt = null;
177: recoveryLog.invalidateInternalConnection();
178: }
179:
180: /**
181: * Log a write-query into the recovery log. This posts the specified logObject
182: * (query) into this loggerThread queue. The actual write to the recoverly-log
183: * db is performed asynchronously by the thread.
184: *
185: * @param logObject the log event to be processed
186: */
187: public synchronized void log(LogEvent logObject) {
188: logQueue.addLast(logObject);
189: notify();
190: }
191:
192: /**
193: * Put back a log entry at the head of the queue in case a problem happened
194: * with this entry and we need to retry it right away.
195: *
196: * @param event the event to be used next by the logger thread.
197: * @param e exception causing the event to fail and to be retried
198: */
199: public synchronized void putBackAtHeadOfQueue(LogEvent event,
200: Exception e) {
201: if (lastFailed != event) {
202: logQueue.addFirst(event);
203: notify();
204: lastFailed = event;
205: } else {
206: if (event instanceof LogRequestEvent)
207: logger
208: .error("WARNING! Your recovery log is probably corrupted, you should perform a restore log operation");
209: logger.error("Logger thread was unable to log "
210: + event.toString() + " because of " + e, e);
211: }
212: }
213:
214: /**
215: * Remove all queries that have not been logged yet and belonging to the
216: * specified transaction.
217: *
218: * @param tid transaction id to rollback
219: */
220: public synchronized void removeQueriesOfTransactionFromQueue(
221: long tid) {
222: if (logger.isDebugEnabled())
223: logger.debug(Translate.get(
224: "recovery.jdbc.loggerthread.removing", tid));
225: Iterator iter = logQueue.iterator();
226: // do not remove the first element of the queue
227: // (must only be done by the run() method)
228: if (iter.hasNext()) {
229: iter.next();
230: }
231: while (iter.hasNext()) {
232: LogEvent event = (LogEvent) iter.next();
233: if (event.belongToTransaction(tid)) {
234: iter.remove();
235: }
236: }
237: }
238:
239: /**
240: * Remove a possibly empty transaction from the recovery log. This method
241: * returns true if no entry or just a begin is found for that transaction. If
242: * a begin was found it will be removed from the log.
243: *
244: * @param transactionId the id of the transaction
245: * @return true if the transaction was empty
246: * @throws SQLException if an error occurs
247: */
248: public boolean removeEmptyTransaction(long transactionId)
249: throws SQLException {
250: if (hasLogEntryForTransaction(transactionId))
251: return false;
252:
253: PreparedStatement stmt = null;
254: ResultSet rs = null;
255: try {
256: stmt = recoveryLog.getDatabaseConnection()
257: .prepareStatement(
258: "SELECT * FROM "
259: + recoveryLog.getLogTableName()
260: + " WHERE transaction_id=?");
261: stmt.setLong(1, transactionId);
262: rs = stmt.executeQuery();
263: if (!rs.next())
264: return true; // no entry for that transaction
265:
266: // Check if the first entry found is a begin
267: String sql = rs.getString(recoveryLog
268: .getLogTableSqlColumnName());
269: if ((sql == null) || !sql.startsWith(RecoveryLog.BEGIN))
270: return false;
271:
272: if (rs.next())
273: return false; // multiple entries in this transaction
274:
275: rs.close();
276: stmt.close();
277:
278: // There is a single BEGIN in the log for that transaction, remove it.
279: stmt = recoveryLog.getDatabaseConnection()
280: .prepareStatement(
281: "DELETE FROM "
282: + recoveryLog.getLogTableName()
283: + " WHERE transaction_id=?");
284: stmt.setLong(1, transactionId);
285: stmt.executeUpdate();
286: return true;
287: } catch (SQLException e) {
288: throw new SQLException(Translate.get(
289: "recovery.jdbc.transaction.remove.failed",
290: new String[] { String.valueOf(transactionId),
291: e.getMessage() }));
292: } finally {
293: try {
294: if (rs != null)
295: rs.close();
296: } catch (Exception ignore) {
297: }
298: try {
299: if (stmt != null)
300: stmt.close();
301: } catch (Exception ignore) {
302: }
303: }
304: }
305:
306: /**
307: * Delete all entries from the CheckpointTable.
308: *
309: * @throws SQLException if an error occurs
310: */
311: public void deleteCheckpointTable() throws SQLException {
312: // First delete from the checkpoint table
313: PreparedStatement stmt = null;
314: try {
315: stmt = recoveryLog.getDatabaseConnection()
316: .prepareStatement(
317: "DELETE FROM "
318: + recoveryLog
319: .getCheckpointTableName());
320: stmt.executeUpdate();
321: } catch (SQLException e) {
322: String msg = "Failed to delete checkpoint table";
323: logger.warn(msg, e);
324: throw new SQLException(msg);
325: } finally {
326: try {
327: if (stmt != null)
328: stmt.close();
329: } catch (Exception ignore) {
330: }
331: }
332: }
333:
334: /**
335: * Store a checkpoint in the recovery log using the provided local log id.<br>
336: * Moreover, in case of error, additionally closes and invalidates log and
337: * unlog statements (internal) before calling
338: * RecoveryLog#invalidateInternalConnection().
339: *
340: * @param checkpointName checkpoint name to insert
341: * @param checkpointLogId checkpoint log identifier
342: * @throws SQLException if a database access error occurs
343: * @see RecoveryLog#storeCheckpointWithLogEntry(String, CheckpointLogEntry)
344: * @see #invalidateLogStatements()
345: */
346: public void storeCheckpointWithLogId(String checkpointName,
347: long checkpointLogId) throws SQLException {
348: PreparedStatement stmt = null;
349: try {
350: if (logger.isDebugEnabled())
351: logger.debug("Storing checkpoint " + checkpointName
352: + " at request id " + checkpointLogId);
353: stmt = recoveryLog.getDatabaseConnection()
354: .prepareStatement(
355: "INSERT INTO "
356: + recoveryLog
357: .getCheckpointTableName()
358: + " VALUES(?,?)");
359: stmt.setString(1, checkpointName);
360: stmt.setLong(2, checkpointLogId);
361: stmt.executeUpdate();
362: } catch (SQLException e) {
363: invalidateLogStatements();
364: throw new SQLException(Translate.get(
365: "recovery.jdbc.checkpoint.store.failed",
366: new String[] { checkpointName, e.getMessage() }));
367: } finally {
368: try {
369: if (stmt != null)
370: stmt.close();
371: } catch (Exception ignore) {
372: }
373: }
374: }
375:
376: /**
377: * Remove a checkpoint in the recovery log.<br />
378: * In case of error, additionely close and invalidates log and unlog
379: * statements (internal) before calling
380: * RecoveryLog#invalidateInternalConnection().
381: *
382: * @param checkpointName name of the checkpoint to remove
383: * @throws SQLException if a database access error occurs
384: * @see org.continuent.sequoia.controller.recoverylog.events.RemoveCheckpointEvent
385: */
386: public void removeCheckpoint(String checkpointName)
387: throws SQLException {
388: PreparedStatement stmt = null;
389:
390: try {
391: stmt = recoveryLog.getDatabaseConnection()
392: .prepareStatement(
393: "DELETE FROM "
394: + recoveryLog
395: .getCheckpointTableName()
396: + " WHERE name like ?");
397: stmt.setString(1, checkpointName);
398: stmt.executeUpdate();
399: stmt.close();
400: } catch (SQLException e) {
401: invalidateLogStatements();
402: throw new SQLException(Translate.get(
403: "recovery.jdbc.checkpoint.remove.failed",
404: new String[] { checkpointName, e.getMessage() }));
405: } finally {
406: try {
407: if (stmt != null)
408: stmt.close();
409: } catch (Exception ignore) {
410: }
411: }
412: }
413:
414: /**
415: * Delete all LogEntries with an identifier lower than oldId (inclusive).
416: * oldId is normally derived from a checkpoint name, which marks the last
417: * request before the checkpoint.
418: *
419: * @param oldId the id up to which entries should be removed.
420: * @throws SQLException if an error occurs
421: */
422: public void deleteLogEntriesBeforeId(long oldId)
423: throws SQLException {
424: PreparedStatement stmt = null;
425: try {
426: stmt = recoveryLog.getDatabaseConnection()
427: .prepareStatement(
428: "DELETE FROM "
429: + recoveryLog.getLogTableName()
430: + " WHERE log_id<=?");
431: stmt.setLong(1, oldId);
432: stmt.executeUpdate();
433: } catch (SQLException e) {
434: // TODO: Check error message below
435: throw new SQLException(Translate.get(
436: "recovery.jdbc.transaction.remove.failed",
437: new String[] { String.valueOf(oldId),
438: e.getMessage() }));
439: } finally {
440: try {
441: if (stmt != null)
442: stmt.close();
443: } catch (Exception ignore) {
444: }
445: }
446: }
447:
448: /**
449: * Return the real number of log entries between 2 log ids (usually matching
450: * checkpoint indices). The SELECT excludes both boundaries.
451: *
452: * @param lowerLogId the lower log id
453: * @param upperLogId the upper log id
454: * @return the number of entries between the 2 ids
455: * @throws SQLException if an error occurs querying the recovery log
456: */
457: public long getNumberOfLogEntries(long lowerLogId, long upperLogId)
458: throws SQLException {
459: ResultSet rs = null;
460: PreparedStatement stmt = null;
461: try {
462: stmt = recoveryLog.getDatabaseConnection()
463: .prepareStatement(
464: "SELECT COUNT(*) FROM "
465: + recoveryLog.getLogTableName()
466: + " WHERE log_id>? AND log_id<?");
467: // Note that the statement is closed in the finally block
468: stmt.setLong(1, lowerLogId);
469: stmt.setLong(2, upperLogId);
470: rs = stmt.executeQuery();
471: if (!rs.next())
472: throw new SQLException(
473: "Failed to retrieve number of log entries (no rows returned)");
474:
475: return rs.getLong(1);
476: } catch (SQLException e) {
477: throw e;
478: } finally {
479: try {
480: if (rs != null)
481: rs.close();
482: } catch (Exception ignore) {
483: }
484: try {
485: if (stmt != null)
486: stmt.close();
487: } catch (Exception ignore) {
488: }
489: }
490: }
491:
492: /**
493: * Shift LogEntries identifiers from the specified value (value is added to
494: * existing identifiers).
495: *
496: * @param shiftValue the value to shift
497: * @throws SQLException if an error occurs
498: */
499: public void shiftLogEntriesIds(long shiftValue) throws SQLException {
500: PreparedStatement stmt = null;
501: try {
502: stmt = recoveryLog.getDatabaseConnection()
503: .prepareStatement(
504: "UPDATE " + recoveryLog.getLogTableName()
505: + " SET log_id=log_id+?");
506: stmt.setLong(1, shiftValue);
507: stmt.executeUpdate();
508: } catch (SQLException e) {
509: throw new SQLException(Translate.get(
510: "recovery.jdbc.loggerthread.shift.failed", e
511: .getMessage()));
512: } finally {
513: try {
514: if (stmt != null)
515: stmt.close();
516: } catch (Exception ignore) {
517: }
518: }
519: }
520:
521: /**
522: * Shift LogEntries identifiers from the specified shiftValue (value is added
523: * to existing identifiers) starting with identifier with a value strictly
524: * greater than the given id.
525: *
526: * @param fromId id to start shifting from
527: * @param shiftValue the value to shift
528: * @throws SQLException if an error occurs
529: */
530: public void shiftLogEntriesAfterId(long fromId, long shiftValue)
531: throws SQLException {
532: PreparedStatement stmt = null;
533: try {
534: stmt = recoveryLog
535: .getDatabaseConnection()
536: .prepareStatement(
537: "UPDATE "
538: + recoveryLog.getLogTableName()
539: + " SET log_id=log_id+? WHERE log_id>?");
540: stmt.setLong(1, shiftValue);
541: stmt.setLong(2, fromId);
542: stmt.executeUpdate();
543: } catch (SQLException e) {
544: throw new SQLException(Translate.get(
545: "recovery.jdbc.loggerthread.shift.failed", e
546: .getMessage()));
547: } finally {
548: try {
549: if (stmt != null)
550: stmt.close();
551: } catch (Exception ignore) {
552: }
553: }
554: }
555:
556: /**
557: * Delete all log entries that have an id strictly between the 2 given
558: * boundaries (commonCheckpointId<id<nowCheckpointId). All checkpoints
559: * pointing to an id in the wiped zone will be deleted as well.
560: *
561: * @param commonCheckpointId lower id bound
562: * @param nowCheckpointId upper id bound
563: * @throws SQLException if an error occurs accessing the log
564: */
565: public void deleteLogEntriesAndCheckpointBetween(
566: long commonCheckpointId, long nowCheckpointId)
567: throws SQLException {
568: PreparedStatement stmt = null;
569: try {
570: // Delete log entries first
571: stmt = recoveryLog.getDatabaseConnection()
572: .prepareStatement(
573: "DELETE FROM "
574: + recoveryLog.getLogTableName()
575: + " WHERE ?<log_id AND log_id<?");
576: stmt.setLong(1, commonCheckpointId);
577: stmt.setLong(2, nowCheckpointId);
578: int rows = stmt.executeUpdate();
579: stmt.close();
580:
581: if (logger.isInfoEnabled()) {
582: logger
583: .info(rows
584: + " outdated log entries have been removed from the recovery log");
585:
586: // Print checkpoints that will be deleted
587: stmt = recoveryLog
588: .getDatabaseConnection()
589: .prepareStatement(
590: "SELECT * FROM "
591: + recoveryLog
592: .getCheckpointTableName()
593: + " WHERE ?<log_id AND log_id<?");
594: stmt.setLong(1, commonCheckpointId);
595: stmt.setLong(2, nowCheckpointId);
596: ResultSet rs = stmt.executeQuery();
597: while (rs.next()) {
598: logger.info("Checkpoint " + rs.getString(1) + " ("
599: + rs.getLong(2) + ") will be deleted.");
600: }
601: if (rs != null)
602: rs.close();
603: stmt.close();
604: }
605:
606: // Now delete checkpoints
607: stmt = recoveryLog.getDatabaseConnection()
608: .prepareStatement(
609: "DELETE FROM "
610: + recoveryLog
611: .getCheckpointTableName()
612: + " WHERE ?<log_id AND log_id<?");
613: stmt.setLong(1, commonCheckpointId);
614: stmt.setLong(2, nowCheckpointId);
615: rows = stmt.executeUpdate();
616:
617: if (logger.isInfoEnabled())
618: logger
619: .info(rows
620: + " out of sync checkpoints have been removed from the recovery log");
621:
622: } catch (SQLException e) {
623: throw new SQLException(Translate.get(
624: "recovery.jdbc.entries.remove.failed", e
625: .getMessage()));
626: } finally {
627: try {
628: if (stmt != null)
629: stmt.close();
630: } catch (Exception ignore) {
631: }
632: }
633: }
634:
635: /**
636: * Log the requests from queue until the thread is explicetly killed. The
637: * logger used is the one of the RecoveryLog.
638: */
639: public void run() {
640: while (!killed) {
641: synchronized (this ) {
642: while (getLogQueueIsEmpty() && !killed) {
643: try {
644: wait();
645: } catch (InterruptedException e) {
646: logger
647: .warn(
648: Translate
649: .get("recovery.jdbc.loggerthread.awaken"),
650: e);
651: }
652: }
653: if (killed)
654: break;
655: // Pump first log entry from the queue but leave it in the queue to show
656: // that we are processing it
657: currentEvent = (LogEvent) logQueue.getFirst();
658: }
659: try {
660: currentEvent.execute(this );
661: } finally { // Remove from the queue anyway
662: synchronized (this ) {
663: logQueue.removeFirst();
664: }
665: }
666: }
667:
668: // Ensure that the log is empty.
669: int finalLogSize = logQueue.size();
670: if (finalLogSize > 0) {
671: logger
672: .warn("Log queue contains requests following shutdown: "
673: + finalLogSize);
674: }
675: logger.info("Logger thread ending: " + this .getName());
676: }
677:
678: /**
679: * Shutdown the current thread. This will cause the log to terminate as soon
680: * as the current event is finished processing. Any remaining events in the
681: * log queue will be discarded.
682: */
683: public synchronized void shutdown() {
684: killed = true;
685: logger.info("Log shutdown method has been invoked");
686: notify();
687: }
688:
689: }
|