0001: package com.quadcap.sql.file;
0002:
0003: /* Copyright 1999 - 2003 Quadcap Software. All rights reserved.
0004: *
0005: * This software is distributed under the Quadcap Free Software License.
0006: * This software may be used or modified for any purpose, personal or
0007: * commercial. Open Source redistributions are permitted. Commercial
0008: * redistribution of larger works derived from, or works which bundle
0009: * this software requires a "Commercial Redistribution License"; see
0010: * http://www.quadcap.com/purchase.
0011: *
0012: * Redistributions qualify as "Open Source" under one of the following terms:
0013: *
0014: * Redistributions are made at no charge beyond the reasonable cost of
0015: * materials and delivery.
0016: *
0017: * Redistributions are accompanied by a copy of the Source Code or by an
0018: * irrevocable offer to provide a copy of the Source Code for up to three
0019: * years at the cost of materials and delivery. Such redistributions
0020: * must allow further use, modification, and redistribution of the Source
0021: * Code under substantially the same terms as this license.
0022: *
0023: * Redistributions of source code must retain the copyright notices as they
0024: * appear in each source code file, these license terms, and the
0025: * disclaimer/limitation of liability set forth as paragraph 6 below.
0026: *
0027: * Redistributions in binary form must reproduce this Copyright Notice,
0028: * these license terms, and the disclaimer/limitation of liability set
0029: * forth as paragraph 6 below, in the documentation and/or other materials
0030: * provided with the distribution.
0031: *
0032: * The Software is provided on an "AS IS" basis. No warranty is
0033: * provided that the Software is free of defects, or fit for a
0034: * particular purpose.
0035: *
0036: * Limitation of Liability. Quadcap Software shall not be liable
0037: * for any damages suffered by the Licensee or any third party resulting
0038: * from use of the Software.
0039: */
0040:
0041: import java.io.File;
0042: import java.io.FileOutputStream;
0043: import java.io.IOException;
0044: import java.io.RandomAccessFile;
0045:
0046: import java.util.ArrayList;
0047: import java.util.List;
0048: import java.util.Map;
0049: import java.util.Properties;
0050:
0051: import javax.concurrent.BoundedBuffer;
0052: import javax.concurrent.Channel;
0053: import javax.concurrent.Latch;
0054:
0055: import com.quadcap.sql.lock.Transaction;
0056: import com.quadcap.sql.lock.TransactionObserver;
0057:
0058: import com.quadcap.util.collections.LongMap;
0059:
0060: import com.quadcap.util.ConfigNumber;
0061: import com.quadcap.util.Debug;
0062: import com.quadcap.util.Util;
0063:
0064: /**
0065: * Rolling write-ahead log implementation. Checkpoints plus physical logging
0066: * of block writes is coupled with logical logging to support transactions
0067: * rollback and recovery.
0068: *
0069: * @author Stan Bailes
0070: */
0071: public class Log1 implements Log {
0072: /** the database we're logging for */
0073: private Datafile db;
0074:
0075: /** the database file we're logging for */
0076: private BlockFile dbFile;
0077:
0078: /** Log writer/reader */
0079: Logger logger;
0080:
0081: /** log file directory */
0082: private File dbRootDir;
0083:
0084: /** log sync thread */
0085: LogSync logSync;
0086:
0087: /** log sync thread done, close interlock */
0088: Latch closeLatch = new Latch();
0089:
0090: /** producer/consumer conduit */
0091: Channel channel = new BoundedBuffer(2048);
0092:
0093: /** our datatbase's file lock */
0094: Object fileLock;
0095:
0096: /** Map rows during recovery, null otherwise. */
0097: LongMap rowIdMap = new LongMap(256);
0098:
0099: /** "before-images" file */
0100: FileOutputStream bfo;
0101: boolean bfoActive = false;
0102:
0103: boolean recovering = false;
0104:
0105: int blockSize;
0106:
0107: /*{com.quadcap.util.Config-vars.xml-1020}
0108: * <config-var>
0109: * <config-name>qed.minSyncInterval</config-name>
0110: * <config-dflt>15000 (15 seconds)</config-dflt>
0111: * <config-desc>The minimum interval (in ms) between database
0112: * sync operations.</config-desc>
0113: * </config-var>
0114: */
0115: long minSyncInterval = ConfigNumber.find("qed.minSyncInterval",
0116: "15000").longValue();
0117:
0118: /*{com.quadcap.util.Config-vars.xml-1021}
0119: * <config-var>
0120: * <config-name>qed.maxSyncInterval</config-name>
0121: * <config-dflt>60000 (60 seconds)</config-dflt>
0122: * <config-desc>The maximum interval (in ms) between database
0123: * sync operations. Use this to help
0124: * limit the logfile, scratch, and before-images files.
0125: * </config-desc>
0126: * </config-var>
0127: */
0128: long maxSyncInterval = ConfigNumber.find("qed.maxSyncInterval",
0129: "60000").longValue();
0130:
0131: int syncMap = 0 + (1 << 1) + // flush -> before.sync
0132: (1 << 2) + // flush -> logger.sync
0133: // (1 << 3) + // flush -> db.tempFile.flush()
0134: (1 << 4) + // checkpoint -> dbfile.flush(fastSync)
0135: (1 << 5) + // checkpoint -> logger.sync
0136: (1 << 6) + // checkpoint -> db.checkpoint(fastSync)
0137: (1 << 7) + // save block -> store.sync
0138: 0;
0139:
0140: //#ifdef DEBUG
0141: String[] syncStrs = { "Log1.reallyFlush from Log2.flushLog",
0142: "Log1.reallyFlush: bfo.sync()",
0143: "Log1.reallyFlush: logger.sync()",
0144: "Log1.reallyFlush: db.tempFile.flush(!)",
0145: "Log1.reallyCheckpoint: dbfile.flush(!)",
0146: "Log1.reallyCheckpoint: logger.sync()",
0147: "Log1.reallyCheckpoint: dbfile.checkpoint(!)",
0148: "Log1.saveBlock: bfo.sync" };
0149:
0150: int[] syncCnts = new int[syncStrs.length];
0151:
0152: //#endif
0153: protected boolean checksync(int x) {
0154: //#ifdef DEBUG
0155: syncCnts[x]++;
0156: //Debug.println("checkSync[" + ret + ": " + syncCnts[x] + " - " + syncStrs[x]);
0157: //#endif
0158: boolean ret = (syncMap & (1 << x)) != 0;
0159: return ret;
0160: }
0161:
0162: /**
0163: * Constructor for transaction log
0164: *
0165: */
0166: public Log1() {
0167: }
0168:
0169: /**
0170: * Initialize the log
0171: *
0172: * @param db the underlying database that we're logging for.
0173: * @param create true if we're creating this database from scratch,
0174: * in which case we can skip any recovery-related activity
0175: */
0176: public void init(Datafile db, boolean create, Properties props)
0177: throws IOException {
0178: this .db = db;
0179: this .dbFile = db.file;
0180: this .blockSize = dbFile.getBlockSize();
0181: this .dbRootDir = db.dbRootDir;
0182: this .fileLock = db.getFileLock();
0183: /*{com.quadcap.sql.Datafile-conn.xml-23}
0184: * <config-var>
0185: * <config-name>fastSync</config-name>
0186: * <config-dflt>true</config-dflt>
0187: * <config-desc>If <code>true</code>, omit time-consuming
0188: * sync operations to permit greater throughput.
0189: * </config-desc>
0190: * </config-var>
0191: */
0192: // this.fastSync =
0193: // props.getProperty("fastSync",
0194: // String.valueOf(fastSync)).equalsIgnoreCase("true");
0195: /*{com.quadcap.sql.Datafile-conn.xml-25}
0196: * <config-var>
0197: * <config-name>minSyncInterval</config-name>
0198: * <config-dflt>from config:qed.minSyncInterval</config-dflt>
0199: * <config-desc>Minimum interval between syncs. Set to zero to
0200: * ensure a sync after every transaction.</config-desc>
0201: * </config-var>
0202: */
0203: this .minSyncInterval = Long.parseLong(props.getProperty(
0204: "minSyncInterval", String.valueOf(minSyncInterval)));
0205:
0206: /*{com.quadcap.sql.Datafile-conn.xml-26}
0207: * <config-var>
0208: * <config-name>maxSyncInterval</config-name>
0209: * <config-dflt>from config:qed.maxSyncInterval</config-dflt>
0210: * <config-desc>Maximum interval between syncs. Use this to help
0211: * limit the logfile, scratch, and before-images files.
0212: * </config-desc>
0213: * </config-var>
0214: */
0215: this .maxSyncInterval = Long.parseLong(props.getProperty(
0216: "maxSyncInterval", String.valueOf(maxSyncInterval)));
0217:
0218: // Gonna leave this undocumented for now since there's only
0219: // one instance of this class.... Too many knobs.
0220: String loggerClass = props.getProperty("loggerClass");
0221: try {
0222: this .logger = (Logger) (Class.forName(loggerClass)
0223: .newInstance());
0224: } catch (Throwable t) {
0225: this .logger = new Logger1();
0226: }
0227: logger.init(this , create, props);
0228:
0229: }
0230:
0231: public void start() {
0232: logSync = new LogSync(this );
0233: logSync.setDaemon(true);
0234: logSync.start();
0235: }
0236:
0237: public void remove() throws IOException {
0238: new File(dbRootDir, "logfile").delete();
0239: }
0240:
0241: /**
0242: * Return the database that we're logging for
0243: */
0244: public Datafile getDatafile() {
0245: return db;
0246: }
0247:
0248: /**
0249: * Return the database root directory
0250: */
0251: public File getDbRootDir() {
0252: return dbRootDir;
0253: }
0254:
0255: /**
0256: * Add a transaction's log record to the end of the open log file
0257: */
0258: int pendingBegins = 0;
0259:
0260: public void addEntry(LogEntry entry) throws IOException {
0261: if (entry.getCode() == LogEntry.BEGIN_TRANSACTION) {
0262: pendingBegins++;
0263: }
0264: put(entry);
0265: }
0266:
0267: /**
0268: * Flush and close the log file.
0269: */
0270: public void close() throws IOException {
0271: put(opClose);
0272: try {
0273: closeLatch.acquire();
0274: } catch (InterruptedException ex) {
0275: }
0276: }
0277:
0278: /**
0279: * Flush all log records to disk. Action not performed on this
0280: * thread, we're in a hurry....
0281: */
0282: public void flushLog() throws IOException {
0283: put(opFlush);
0284: }
0285:
0286: /**
0287: * Perform a checkpoint operation.
0288: */
0289: public void checkpoint() throws IOException {
0290: put(opCheckpoint);
0291: }
0292:
0293: /**
0294: * Wait for all queued ops to be processed by the log sync thread
0295: */
0296: public void sync() throws IOException {
0297: Latch latch = new Latch();
0298: put(new Sync(latch));
0299: try {
0300: latch.acquire();
0301: } catch (InterruptedException ex) {
0302: }
0303: }
0304:
0305: /**
0306: * Transaction rollback.
0307: */
0308: public void rollbackTransaction(Transaction trans)
0309: throws IOException {
0310: put(new Rollback(trans));
0311: sync();
0312: }
0313:
0314: /**
0315: * Statement rollback.
0316: */
0317: public void rollbackStatement(Transaction trans, int stmtId)
0318: throws IOException {
0319: put(new Rollback(trans, stmtId));
0320: sync();
0321: }
0322:
0323: /**
0324: * Restart from a previous state
0325: */
0326: public void restart() throws Exception {
0327: reallyRestart();
0328: }
0329:
0330: //------------------------------------------------------------------
0331:
0332: /**
0333: * Retrieve a row mapping.
0334: */
0335: public final long getRowMap(long rowId) {
0336: long ret = rowId;
0337: if (rowIdMap != null) {
0338: Long mapped = (Long) rowIdMap.get(rowId);
0339: if (mapped != null)
0340: ret = mapped.longValue();
0341: }
0342: return ret;
0343: }
0344:
0345: /**
0346: * Remember a row mapping {old,new} The old row (logRow) is now stored
0347: * in a new place (fileRow), so any stored log entries that refer to
0348: * the old row need to be translated to use the new row.
0349: *
0350: * @param logRow the "old" row
0351: * @param fileRow the "new" row
0352: */
0353: public final void putRowMap(long logRow, long fileRow) {
0354: if (rowIdMap == null)
0355: rowIdMap = new LongMap(256);
0356: rowIdMap.put(logRow, new Long(fileRow));
0357: }
0358:
0359: public final void removeRowMap(long row) {
0360: if (rowIdMap != null)
0361: rowIdMap.remove(row);
0362: }
0363:
0364: void reallyFlush() throws IOException {
0365: //#ifdef DEBUG
0366: if (Trace.bit(18)) {
0367: Debug.println(this + ".reallyFlush()");
0368: }
0369: //#endif
0370: if (checksync(2))
0371: logger.sync();
0372: if (db.tempFile != null) {
0373: db.tempFile.flush(!checksync(3));
0374: }
0375: }
0376:
0377: void reallyClose() throws IOException {
0378: try {
0379: if (logSync != null) {
0380: logSync.close();
0381: }
0382: } finally {
0383: logSync = null;
0384: if (bfo != null) {
0385: try {
0386: bfo.close();
0387: } finally {
0388: bfo = null;
0389: }
0390: }
0391: }
0392: }
0393:
0394: void reallyRollbackTransaction(Transaction tr) throws Exception {
0395: //#ifdef DEBUG
0396: if (Trace.bit(18)) {
0397: Debug.println("reallyRollbackTransaction(" + tr + ")");
0398: }
0399: //#endif
0400: long t = tr.getTransactionId();
0401: LogEntry e = logger.getLastOp(t);
0402: try {
0403: while (e != null) {
0404: //Debug.println(" e = " + e);
0405: if (e.getTransactionId() == t) {
0406: switch (e.getCode()) {
0407: case LogEntry.STEP:
0408: if (e.getRedoState() == LogEntry.DONE) {
0409: try {
0410: e.undo(tr, db);
0411: } catch (Throwable th) {
0412: //#ifdef DEBUG
0413: Debug
0414: .println("Exception in rollback transaction");
0415: Debug.print(th);
0416: //#endif
0417: }
0418: logger.setRedoState(e, LogEntry.UNDONE);
0419: }
0420: break;
0421: case LogEntry.BEGIN_TRANSACTION:
0422: e = null;
0423: break;
0424: default:
0425: break;
0426: }
0427: }
0428: e = (e == null || e.getPrev() < 0) ? null : logger
0429: .getPrevOp(e);
0430: }
0431: } finally {
0432: rowIdMap = null;
0433: }
0434: }
0435:
0436: void reallyRollbackStatement(Transaction tr, int s)
0437: throws Exception {
0438: long t = tr.getTransactionId();
0439: LogEntry e = logger.getLastOp(t);
0440: rowIdMap = null;
0441: try {
0442: while (e != null) {
0443: if (e.getStatementId() == s) {
0444: switch (e.getCode()) {
0445: case LogEntry.STEP:
0446: if (e.getRedoState() == LogEntry.DONE) {
0447: try {
0448: e.undo(tr, db);
0449: } catch (Throwable th) {
0450: //#ifdef DEBUG
0451: Debug
0452: .println("Exception in statement rollback");
0453: Debug.print(th);
0454: //#endif
0455: }
0456: logger.setRedoState(e, LogEntry.UNDONE);
0457: }
0458: break;
0459: case LogEntry.BEGIN_STATEMENT:
0460: //Debug.println("--- end rollbackStatement(" + tr + "," +
0461: // s + ")");
0462: return;
0463: default:
0464: break;
0465: }
0466: }
0467: e = logger.getPrevOp(e);
0468: }
0469: } finally {
0470: rowIdMap = null;
0471: }
0472: }
0473:
0474: LogEntry scanLog(LongMap t) throws IOException {
0475: LogEntry op = logger.getFirstOp();
0476: LogEntry last = op;
0477: for (; op != null; last = op, op = logger.getNextOp()) {
0478: LogEntry e = op;
0479: if (e != null && e.getCode() == LogEntry.COMMIT) {
0480: //Debug.println(" [T:" + e.getTransactionId() + " committed]");
0481: t.put(e.getTransactionId(), "");
0482: }
0483: }
0484: return last;
0485: }
0486:
0487: void reallyRestart() throws Exception {
0488: //#ifdef DEBUG
0489: if (Trace.bit(18)) {
0490: Debug.println(toString() + ".reallyRestart()");
0491: }
0492: //#endif
0493: db.getTempFile(false); // recover the scratch file.
0494: Transaction t = db.makeTransaction(false);
0495: LongMap map = new LongMap(32);
0496: LogEntry last = scanLog(map);
0497: int q = 0;
0498: int checkpointPosition = logger.getCheckpoint();
0499: int endPosition = logger.getEnd();
0500: LogEntry op = last;
0501: recovering = true;
0502: try {
0503: for (; op != null; op = logger.getPrevOp(op)) {
0504: LogEntry e = op;
0505: if (e != null && e.getCode() == LogEntry.STEP
0506: && e.getPosition() < checkpointPosition
0507: && map.get(e.getTransactionId()) == null) {
0508: if (e.getRedoState() == LogEntry.DONE) {
0509: //#ifdef DEBUG
0510: if (Trace.bit(18)) {
0511: Debug.println("UNDO[" + e + "]");
0512: }
0513: //#endif
0514: if (q++ == 0) {
0515: Debug
0516: .println("Undoing in-progress transactions...");
0517: }
0518: e.undo(t, db);
0519: }
0520: }
0521: }
0522: rowIdMap = null;
0523:
0524: int p = 0;
0525: for (op = logger.getFirstOp(); op != null; op = logger
0526: .getNextOp()) {
0527: LogEntry e = op;
0528: if (e.getPosition() >= endPosition)
0529: break;
0530: if (e != null && map.get(e.getTransactionId()) != null) {
0531: if (e.getCode() == LogEntry.STEP) {
0532: if (e.getRedoState() == LogEntry.DONE) {
0533: if (p++ == 0) {
0534: Debug
0535: .println("Restoring committed transactions: "
0536: + map);
0537: }
0538: //#ifdef DEBUG
0539: if (Trace.bit(18)) {
0540: Debug.println("REDO[" + e + "]");
0541: }
0542: //#endif
0543: e.redo(t, db);
0544: }
0545: }
0546: }
0547: }
0548: rowIdMap = null;
0549:
0550: if (p > 0 || q > 0) {
0551: Debug.println("Recovery complete: " + p + " redos, "
0552: + q + " undos");
0553: }
0554: logger.reset();
0555: logger.sync();
0556: if (p > 0 || q > 0) {
0557: checkpoint();
0558: }
0559: } finally {
0560: recovering = false;
0561: }
0562: }
0563:
0564: long lastCheckpoint = System.currentTimeMillis();
0565:
0566: void maybeCheckpoint() throws IOException {
0567: long now = System.currentTimeMillis();
0568: boolean idle = logger.getActiveTransactionCount() == 0;
0569: if (idle)
0570: rowIdMap = null;
0571: long interval = idle ? minSyncInterval : maxSyncInterval;
0572: if (now - lastCheckpoint >= interval) {
0573: //#ifdef DEBUG
0574: if (Trace.bit(25)) {
0575: Debug.println("[" + entryCount + BlockStore.rw()
0576: + "] interval " + (now - lastCheckpoint)
0577: + " ms," + (idle ? " IDLE" : "")
0578: + " checkpoint now");
0579: }
0580: //#endif
0581:
0582: reallyCheckpoint();
0583:
0584: //#ifdef DEBUG
0585: if (Trace.bit(25)) {
0586: Debug.println("checkpoint done");
0587: }
0588: //#endif
0589: }
0590: }
0591:
0592: void reallyCheckpoint() throws IOException {
0593: try {
0594: db.flushRoot();
0595: dbFile.flush(!checksync(4));
0596: logger.checkpoint();
0597:
0598: int numTrans = logger.getActiveTransactionCount();
0599: boolean truncate = pendingBegins == 0 && numTrans == 0;
0600: if (truncate) {
0601: logger.reset();
0602: }
0603: logger.sync();
0604: db.checkpoint(truncate, !checksync(6));
0605: db.checkpointHandler(logger.getActiveTransactions());
0606: dbFile.clearModified();
0607: } finally {
0608: lastCheckpoint = System.currentTimeMillis();
0609: rowIdMap = null;
0610: }
0611: //#ifdef DEBUG
0612: if (Trace.bit(21)) {
0613: Debug.println("AFTER Log1.reallyCheckpoint: ["
0614: + logger.getActiveTransactionCount() + "]");
0615: }
0616: if (false) {
0617: long sum = 0;
0618: long blk = dbFile.getBlockSize();
0619: for (long x = 0; x < dbFile.getSize(); x += blk) {
0620: Block b = dbFile.getBlock(x / blk);
0621: for (int ix = 0; ix < blk; ix += 8) {
0622: sum += b.readLong(ix);
0623: }
0624: b.decrRefCount();
0625: }
0626: Debug.println("CHECKPOINT: CHECKSUM(" + dbFile.getSize()
0627: + ") bytes = " + sum);
0628: }
0629: //#endif
0630: }
0631:
0632: //#ifdef DEBUG
0633: int entryCount = 0;
0634:
0635: //#endif
0636:
0637: public void reallyAddEntry(LogEntry entry) throws IOException {
0638: //#ifdef DEBUG
0639: entryCount++;
0640: if (Trace.bit(16)) {
0641: Debug
0642: .println(toString() + ".reallyAddEntry(" + entry
0643: + ")");
0644: }
0645: //#endif
0646: try {
0647: logger.put(entry);
0648: if (entry.getCode() == LogEntry.BEGIN_TRANSACTION) {
0649: pendingBegins--;
0650: }
0651: } catch (IOException ex) {
0652: if (ex.toString().indexOf("full") > 0) { // XXX need better way
0653: abortOldestTransaction(entry);
0654: logger.put(entry);
0655: } else {
0656: throw ex;
0657: }
0658: }
0659: }
0660:
0661: private void sortEntry(long tId, List save, List discard, LogEntry e) {
0662: if (e.getTransactionId() == tId) {
0663: discard.add(e);
0664: } else {
0665: save.add(e);
0666: }
0667: }
0668:
0669: private final void abortOldestTransaction(LogEntry entry)
0670: throws IOException {
0671: long tId = logger.getOldestTransaction();
0672: Transaction t = db.findTransaction(tId);
0673: if (t == null) {
0674: Debug.println("Log full, will reset log");
0675: logger.reset();
0676: // XXX should this ever happen, we might
0677: // XXX be better off if we aborted all the active transactions
0678: } else {
0679: ArrayList save = new ArrayList();
0680: Debug.println("Log full, will abort " + t);
0681: try {
0682: // First retrieve all of the pending log entries. There
0683: // may be some for the doomed transaction -- we'll undo
0684: // those first.
0685: ArrayList discard = new ArrayList();
0686: LogEntry e;
0687: while ((e = (LogEntry) channel.poll(0)) != null) {
0688: sortEntry(tId, save, discard, e);
0689: }
0690: sortEntry(tId, save, discard, entry);
0691: Debug.println("discarding " + discard.size()
0692: + " completed log entries");
0693: for (int i = discard.size() - 1; i >= 0; i--) {
0694: e = (LogEntry) discard.get(i);
0695: try {
0696: e.undo(t, db);
0697: } catch (Throwable th) {
0698: //#ifdef DEBUG
0699: Debug.println("Error during abort oldest");
0700: Debug.print(th);
0701: //#endif
0702: }
0703: }
0704: reallyRollbackTransaction(t);
0705: } catch (IOException ex1) {
0706: throw ex1;
0707: } catch (Exception ex2) {
0708: //#ifdef DEBUG
0709: Debug.print(ex2);
0710: //#endif
0711: throw new DatafileException(ex2);
0712: }
0713: TransactionObserver obs = t.getObserver();
0714: if (obs != null) {
0715: obs.abort(t);
0716: }
0717: reallyCheckpoint();
0718: // After we've cleaned up the mess from the aborted transaction,
0719: // we need to handle all of the queued log entries that we
0720: // consumed.
0721: Debug.println("handling " + save.size()
0722: + " saved log entries");
0723: for (int i = 0; i < save.size(); i++) {
0724: LogEntry e = (LogEntry) save.get(i);
0725: try {
0726: e.handle(this );
0727: } catch (Throwable th) {
0728: Debug.print(th);
0729: }
0730: }
0731: }
0732: }
0733:
0734: /**
0735: * XXXX Problem: If you have "the lock" and you call this when the channel
0736: * is full, you may block. This would be bad, because it might block
0737: * the log thread trying to get the lock, leading to deadlock
0738: */
0739: public void put(LogEntry h) throws IOException {
0740: while (true) {
0741: try {
0742: channel.put(h);
0743: } catch (InterruptedException ex) {
0744: continue;
0745: }
0746: return;
0747: }
0748: }
0749:
0750: /**
0751: * Inner class to bind Flush op to 'log.reallyFlush()' method.
0752: */
0753: public static class Flush extends LogEntry {
0754: public Flush() {
0755: super (FLUSH);
0756: }
0757:
0758: public void handle(Log log) throws IOException {
0759: ((Log1) log).reallyFlush();
0760: }
0761: }
0762:
0763: /**
0764: * A single static instance of this op is all we need.
0765: */
0766: static Flush opFlush = new Flush();
0767:
0768: /**
0769: * Inner class to bind Checkpoint op to 'log.reallyCheckpoint' method
0770: */
0771: public static class Checkpoint extends LogEntry {
0772: public Checkpoint() {
0773: super (CHECKPOINT);
0774: }
0775:
0776: public void handle(Log log) throws IOException {
0777: ((Log1) log).reallyCheckpoint();
0778: }
0779: }
0780:
0781: static Checkpoint opCheckpoint = new Checkpoint();
0782:
0783: /**
0784: * Inner class to bind Close op to 'log.reallyClose()' method
0785: */
0786: public static class Close extends LogEntry {
0787: public Close() {
0788: super (CLOSE);
0789: }
0790:
0791: public void handle(Log log) throws IOException {
0792: ((Log1) log).reallyClose();
0793: }
0794: }
0795:
0796: /**
0797: * A single static instance of this op is all we need.
0798: */
0799: static Close opClose = new Close();
0800:
0801: /**
0802: * Inner class for transaction/statement rollback
0803: */
0804: public class Rollback extends LogEntry {
0805: Transaction t;
0806:
0807: public Rollback(Transaction t) {
0808: super (t.getTransactionId(), ROLLBACK);
0809: this .t = t;
0810: }
0811:
0812: public Rollback(Transaction t0, int s) {
0813: super (t0.getTransactionId(), s, ROLLBACK);
0814: this .t = t0;
0815: }
0816:
0817: public void handle(Log log) throws Exception {
0818: if (stmtId == -1) {
0819: reallyRollbackTransaction(t);
0820: } else {
0821: reallyRollbackStatement(t, stmtId);
0822: }
0823: }
0824: }
0825:
0826: /**
0827: * Inner class for log thread syncing
0828: */
0829: public class Sync extends LogEntry {
0830: Latch latch;
0831:
0832: public Sync(Latch latch) {
0833: super (SYNC);
0834: this .latch = latch;
0835: }
0836:
0837: public void handle(Log log) throws Exception {
0838: latch.release();
0839: }
0840: }
0841:
0842: /**
0843: * Private inner class which implements single-threaded log file
0844: * writer, using a Channel.
0845: */
0846: class LogSync extends Thread {
0847: boolean closeMe = false;
0848: Log1 log = null;
0849:
0850: LogSync(Log1 log) {
0851: super ("Log Sync");
0852: this .log = log;
0853: }
0854:
0855: public void close() {
0856: closeMe = true;
0857: }
0858:
0859: public void run() {
0860: try {
0861: while (!closeMe) {
0862: Object obj = channel.poll(500);
0863: if (obj != null) {
0864: // once we've got something to do, keep the lock
0865: // until we're finished.
0866: synchronized (fileLock) {
0867: while (obj != null) {
0868: try {
0869: //#ifdef DEBUG
0870: if (Trace.bit(17)) {
0871: Debug.println("PRE [" + obj
0872: + "].handle()");
0873: }
0874: //#endif
0875: ((LogEntry) obj).handle(log);
0876: //#ifdef DEBUG
0877: if (Trace.bit(15)) {
0878: Debug.println("POST [" + obj
0879: + "].handle()");
0880: }
0881: //#endif
0882: } catch (IOException ex) {
0883: Debug
0884: .println("LogSync: Got exception in ["
0885: + obj
0886: + "].handle()");
0887: Debug.print(ex);
0888: } catch (Throwable t) {
0889: Debug
0890: .println("LogSync: Got exception in ["
0891: + obj
0892: + "].handle()");
0893: Debug.print(t);
0894: }
0895: obj = channel.poll(0);
0896: }
0897: if (!closeMe) {
0898: log.maybeCheckpoint();
0899: }
0900: }
0901: }
0902: }
0903: log = null;
0904: if (logger != null)
0905: logger.close();
0906: } catch (InterruptedException ex) {
0907: Debug.print(ex);
0908: } catch (Throwable t) {
0909: Debug.print(t);
0910: } finally {
0911: //#ifdef DEBUG
0912: //Debug.println("Log Sync thread exiting **** ");
0913: //#endif
0914: closeLatch.release();
0915: }
0916: }
0917:
0918: }
0919:
0920: /**
0921: * Are you logging?
0922: *
0923: * Oh yes.
0924: */
0925: public boolean isLogging() {
0926: return true;
0927: }
0928:
0929: /**
0930: * Are we currently performing recovery?
0931: */
0932: public boolean inRecovery() {
0933: return recovering;
0934: }
0935:
0936: //#ifdef DEBUG
0937: int filepos = 0;
0938: //#endif
0939:
0940: /**
0941: * Save a "before" image
0942: */
0943: byte[] sav = null;
0944:
0945: public void saveBlock(long b) throws IOException {
0946: if (sav == null) {
0947: sav = new byte[blockSize + 8];
0948: }
0949: ByteUtil.putLong(sav, 0, b);
0950: dbFile.store.read(b, sav, 8);
0951: if (bfo == null)
0952: resetBlocks();
0953: bfo.write(sav, 0, sav.length);
0954: bfoActive = true;
0955: //#ifdef DEBUG
0956: if (Trace.bit(19))
0957: Debug.println("saveBlock(" + b + ", "
0958: + Block.signature(sav, 8, blockSize) + " @ "
0959: + filepos + ")");
0960: filepos += sav.length;
0961: //#endif
0962: if (checksync(7)) {
0963: try {
0964: bfo.getFD().sync();
0965: } catch (Throwable t) {
0966: } finally {
0967: bfoActive = false;
0968: }
0969: }
0970: }
0971:
0972: /**
0973: * Restore all the "before" images
0974: */
0975: public void restoreBlocks() throws IOException {
0976: //#ifdef DEBUG
0977: if (Trace.bit(19)) {
0978: Debug.println(db.toString() + ".restoreBlocks(): size = "
0979: + dbFile.getSize());
0980: }
0981: //#endif
0982: File f = new File(db.getScratchDir(), "before-images");
0983: int siz = blockSize + 8;
0984: if (f.exists() && f.length() >= siz) {
0985: RandomAccessFile bf = new RandomAccessFile(f, "r");
0986: try {
0987: byte[] buf = new byte[siz];
0988: long pos = bf.length() - siz;
0989: while (pos >= 0) {
0990: bf.seek(pos);
0991: bf.read(buf);
0992: long blk = ByteUtil.getLong(buf, 0);
0993: //#ifdef DEBUG
0994: if (Trace.bit(19)) {
0995: Debug.println(db.toString() + " [RESTORE "
0996: + blk + " @ " + pos + "]");
0997: }
0998: //#endif
0999: dbFile.restoreBlock(blk, buf, 8);
1000: pos -= siz;
1001: }
1002: } finally {
1003: bf.close();
1004: }
1005: }
1006: //#ifdef DEBUG
1007: if (Trace.bit(19)) {
1008: Debug.println(db.toString() + ".restoreBlocks(): size = "
1009: + dbFile.getSize());
1010: }
1011: if (false) {
1012: long sum = 0;
1013: long blk = dbFile.getBlockSize();
1014: for (long x = 0; x < dbFile.getSize(); x += blk) {
1015: Block b = dbFile.getBlock(x / blk);
1016: for (int ix = 0; ix < blk; ix += 8) {
1017: sum += b.readLong(ix);
1018: }
1019: b.decrRefCount();
1020: }
1021: Debug.println("RECOVER: CHECKSUM(" + dbFile.getSize()
1022: + ") bytes = " + sum);
1023: }
1024: //#endif
1025: }
1026:
1027: /**
1028: * Reset the "before" list to be empty
1029: */
1030: public void resetBlocks() throws IOException {
1031: if (bfo != null) {
1032: try {
1033: bfo.close();
1034: } catch (Throwable t) {
1035: } finally {
1036: bfo = null;
1037: }
1038: }
1039: File b = new File(db.getScratchDir(), "before-images");
1040: bfo = new FileOutputStream(b);
1041: bfoActive = false;
1042: //#ifdef DEBUG
1043: if (Trace.bit(19))
1044: Debug.println(db.toString() + ".Log1.resetBlocks() @ "
1045: + filepos + ", dbFile.size = " + dbFile.getSize());
1046: filepos = 0;
1047: //#endif
1048: }
1049:
1050: //#ifdef DEBUG
1051: public String toString() {
1052: String s = getClass().getName();
1053: int x = s.lastIndexOf('.');
1054: if (x >= 0)
1055: s = s.substring(x + 1);
1056: return s;
1057: }
1058: //#endif
1059: }
|