001: /*
002: * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
003: * (http://h2database.com/html/license.html).
004: * Initial Developer: H2 Group
005: */
006: package org.h2.log;
007:
008: import java.io.IOException;
009: import java.sql.SQLException;
010:
011: import org.h2.api.DatabaseEventListener;
012: import org.h2.constant.ErrorCode;
013: import org.h2.engine.Constants;
014: import org.h2.engine.Database;
015: import org.h2.engine.Session;
016: import org.h2.message.Message;
017: import org.h2.message.Trace;
018: import org.h2.store.DataPage;
019: import org.h2.store.DiskFile;
020: import org.h2.store.FileStore;
021: import org.h2.store.Record;
022: import org.h2.store.Storage;
023: import org.h2.util.FileUtils;
024: import org.h2.util.MathUtils;
025: import org.h2.util.ObjectArray;
026:
027: /**
028: * Each transaction log file contains a number of log records.
029: *
030: * Header format:
031: * <pre>
032: * int logId (<0 means ignore: rolled back already)
033: * int firstUncommittedLogRecordId (-1 if none)
034: * int firstUnwrittenLogRecordId (-1 if none)
035: * </pre>
036: *
037: * Record format:
038: * <pre>
039: * int block size
040: * byte 'D' (delete) / 'I' (insert) / 'C' (commit) /
041: * 'R' (rollback) / 'P' (prepare commit) / 'T' (truncate)
042: * int session
043: * [delete/insert only:]
044: * int storage
045: * int record.pos
046: * int record.blockCount
047: * [prepare commit only:]
048: * string transaction
049: * </pre>
050: */
051: public class LogFile {
052:
053: private static final int BUFFER_SIZE = 8 * 1024;
054: public static final int BLOCK_SIZE = 16;
055:
056: private LogSystem logSystem;
057: private Database database;
058: private int id;
059: private String fileNamePrefix;
060: private String fileName;
061: private FileStore file;
062: private int bufferPos;
063: private byte[] buffer;
064: private ObjectArray unwritten;
065: private DataPage rowBuff;
066: private int pos = LogSystem.LOG_WRITTEN;
067: private int firstUncommittedPos = LogSystem.LOG_WRITTEN;
068: private int firstUnwrittenPos = LogSystem.LOG_WRITTEN;
069:
070: LogFile(LogSystem log, int id, String fileNamePrefix)
071: throws SQLException {
072: this .logSystem = log;
073: this .database = log.getDatabase();
074: this .id = id;
075: this .fileNamePrefix = fileNamePrefix;
076: fileName = getFileName();
077: file = log.getDatabase().openFile(fileName,
078: log.getAccessMode(), false);
079: rowBuff = log.getRowBuffer();
080: buffer = new byte[BUFFER_SIZE];
081: unwritten = new ObjectArray();
082: try {
083: readHeader();
084: if (!log.getDatabase().getReadOnly()) {
085: writeHeader();
086: }
087: pos = getBlock();
088: firstUncommittedPos = pos;
089: } catch (SQLException e) {
090: close(false);
091: throw e;
092: }
093: }
094:
095: static LogFile openIfLogFile(LogSystem log, String fileNamePrefix,
096: String fileName) throws SQLException {
097: if (!fileName.endsWith(Constants.SUFFIX_LOG_FILE)) {
098: return null;
099: }
100: if (!FileUtils.fileStartsWith(fileName, fileNamePrefix + ".")) {
101: return null;
102: }
103: String s = fileName.substring(fileNamePrefix.length() + 1,
104: fileName.length() - Constants.SUFFIX_LOG_FILE.length());
105: for (int i = 0; i < s.length(); i++) {
106: if (!Character.isDigit(s.charAt(i))) {
107: return null;
108: }
109: }
110: int id = Integer.parseInt(s);
111: if (!FileUtils.exists(fileName)) {
112: // the file could have been deleted by now (by the DelayedFileDeleter)
113: return null;
114: }
115: return new LogFile(log, id, fileNamePrefix);
116: }
117:
118: public String getFileName() {
119: return fileNamePrefix + "." + id + Constants.SUFFIX_LOG_FILE;
120: }
121:
122: public int getId() {
123: return id;
124: }
125:
126: private int getBlock() throws SQLException {
127: if (file == null) {
128: throw Message
129: .getSQLException(ErrorCode.SIMULATED_POWER_OFF);
130: }
131: return (int) (file.getFilePointer() / BLOCK_SIZE);
132: }
133:
134: private void writeBuffer(DataPage buff, Record rec)
135: throws SQLException {
136: if (file == null) {
137: throw Message
138: .getSQLException(ErrorCode.SIMULATED_POWER_OFF);
139: }
140: int size = MathUtils.roundUp(buff.length()
141: + buff.getFillerLength(), BLOCK_SIZE);
142: int blockCount = size / BLOCK_SIZE;
143: buff.fill(size);
144: buff.setInt(0, blockCount);
145: buff.updateChecksum();
146: // IOLogger.getInstance().logWrite(this.fileName,
147: // file.getFilePointer(), buff.length());
148: if (rec != null) {
149: unwritten.add(rec);
150: }
151: if (buff.length() + bufferPos > buffer.length) {
152: // the buffer is full
153: flush();
154: }
155: if (buff.length() >= buffer.length) {
156: // special case really long write request: write it without buffering
157: file.write(buff.getBytes(), 0, buff.length());
158: pos = getBlock();
159: return;
160: }
161: System.arraycopy(buff.getBytes(), 0, buffer, bufferPos, buff
162: .length());
163: bufferPos += buff.length();
164: pos = getBlock() + (bufferPos / BLOCK_SIZE);
165: }
166:
167: void commit(Session session) throws SQLException {
168: DataPage buff = rowBuff;
169: buff.reset();
170: buff.writeInt(0);
171: buff.writeByte((byte) 'C');
172: buff.writeInt(session.getId());
173: writeBuffer(buff, null);
174: if (logSystem.getFlushOnEachCommit()) {
175: flush();
176: }
177: }
178:
179: void prepareCommit(Session session, String transaction)
180: throws SQLException {
181: DataPage buff = rowBuff;
182: buff.reset();
183: buff.writeInt(0);
184: buff.writeByte((byte) 'P');
185: buff.writeInt(session.getId());
186: buff.writeString(transaction);
187: writeBuffer(buff, null);
188: if (logSystem.getFlushOnEachCommit()) {
189: flush();
190: }
191: }
192:
193: private DataPage readPage() throws SQLException {
194: byte[] buff = new byte[BLOCK_SIZE];
195: file.readFully(buff, 0, BLOCK_SIZE);
196: DataPage s = DataPage.create(database, buff);
197: int blocks = Math.abs(s.readInt());
198: if (blocks > 1) {
199: byte[] b2 = new byte[blocks * BLOCK_SIZE];
200: System.arraycopy(buff, 0, b2, 0, BLOCK_SIZE);
201: buff = b2;
202: file.readFully(buff, BLOCK_SIZE, blocks * BLOCK_SIZE
203: - BLOCK_SIZE);
204: s = DataPage.create(database, buff);
205: s.check(blocks * BLOCK_SIZE);
206: } else {
207: s.reset();
208: }
209: return s;
210: }
211:
212: /**
213: * Redo or undo one item in the log file.
214: *
215: * @param undo true if the operation should be undone
216: * @param readOnly if the file is read only
217: * @return true if there are potentially more operations
218: */
219: private boolean redoOrUndo(boolean undo, boolean readOnly)
220: throws SQLException {
221: int pos = getBlock();
222: DataPage in = readPage();
223: int blocks = in.readInt();
224: if (blocks < 0) {
225: return true;
226: } else if (blocks == 0) {
227: truncate(pos);
228: return false;
229: }
230: char type = (char) in.readByte();
231: int sessionId = in.readInt();
232: if (type == 'P') {
233: if (undo) {
234: throw Message
235: .getInternalError("can't undo prepare commit");
236: }
237: String transaction = in.readString();
238: logSystem.setPreparedCommitForSession(this , sessionId, pos,
239: transaction, blocks);
240: return true;
241: } else if (type == 'C') {
242: if (undo) {
243: throw Message.getInternalError("can't undo commit");
244: }
245: logSystem.setLastCommitForSession(sessionId, id, pos);
246: return true;
247: } else if (type == 'R') {
248: if (undo) {
249: throw Message.getInternalError("can't undo rollback");
250: }
251: return true;
252: } else if (type == 'S') {
253: if (undo) {
254: throw Message.getInternalError("can't undo summary");
255: }
256: }
257: if (readOnly && type != 'S') {
258: return true;
259: }
260: if (undo) {
261: if (logSystem.isSessionCommitted(sessionId, id, pos)) {
262: logSystem.removeSession(sessionId);
263: return true;
264: }
265: } else {
266: if (type != 'S') {
267: if (!readOnly) {
268: logSystem.addUndoLogRecord(this , pos, sessionId);
269: }
270: }
271: }
272: int storageId = in.readInt();
273: Storage storage = logSystem.getStorageForRecovery(storageId);
274: DataPage rec = null;
275: int recordId = in.readInt();
276: int blockCount = in.readInt();
277: if (type != 'T') {
278: rec = in.readDataPageNoSize();
279: }
280: switch (type) {
281: case 'S': {
282: int fileType = in.readByte();
283: boolean diskFile;
284: if (fileType == 'D') {
285: diskFile = true;
286: } else if (fileType == 'I') {
287: diskFile = false;
288: } else {
289: // unknown type, maybe linear index file (future)
290: break;
291: }
292: int sumLength = in.readInt();
293: byte[] summary = new byte[sumLength];
294: if (sumLength > 0) {
295: in.read(summary, 0, sumLength);
296: }
297: if (diskFile) {
298: database.getDataFile().initFromSummary(summary);
299: } else {
300: database.getIndexFile().initFromSummary(summary);
301: }
302: break;
303: }
304: case 'T':
305: if (undo) {
306: throw Message.getInternalError("cannot undo truncate");
307: } else {
308: logSystem.addRedoLog(storage, recordId, blockCount,
309: null);
310: storage.setRecordCount(0);
311: storage.getDiskFile().setPageOwner(
312: recordId / DiskFile.BLOCKS_PER_PAGE, -1);
313: logSystem.setLastCommitForSession(sessionId, id, pos);
314: }
315: break;
316: case 'I':
317: if (undo) {
318: logSystem.addRedoLog(storage, recordId, blockCount,
319: null);
320: storage.setRecordCount(storage.getRecordCount() - 1);
321: } else {
322: logSystem.getOrAddSessionState(sessionId);
323: logSystem
324: .addRedoLog(storage, recordId, blockCount, rec);
325: storage.setRecordCount(storage.getRecordCount() + 1);
326: }
327: break;
328: case 'D':
329: if (undo) {
330: logSystem
331: .addRedoLog(storage, recordId, blockCount, rec);
332: storage.setRecordCount(storage.getRecordCount() + 1);
333: } else {
334: logSystem.getOrAddSessionState(sessionId);
335: logSystem.addRedoLog(storage, recordId, blockCount,
336: null);
337: storage.setRecordCount(storage.getRecordCount() - 1);
338: }
339: break;
340: default:
341: throw Message.getInternalError("type=" + type);
342: }
343: return true;
344: }
345:
346: public void redoAllGoEnd() throws SQLException {
347: boolean readOnly = logSystem.getDatabase().getReadOnly();
348: long length = file.length();
349: if (length <= FileStore.HEADER_LENGTH) {
350: return;
351: }
352: try {
353: int max = (int) (length / BLOCK_SIZE);
354: while (true) {
355: pos = getBlock();
356: database.setProgress(
357: DatabaseEventListener.STATE_RECOVER, fileName,
358: pos, max);
359: if ((long) pos * BLOCK_SIZE >= length) {
360: break;
361: }
362: boolean more = redoOrUndo(false, readOnly);
363: if (!more) {
364: break;
365: }
366: }
367: database.setProgress(DatabaseEventListener.STATE_RECOVER,
368: fileName, max, max);
369: } catch (SQLException e) {
370: database.getTrace(Trace.LOG).debug(
371: "Stop reading log file: " + e.getMessage(), e);
372: // wrong checksum (at the end of the log file)
373: } catch (OutOfMemoryError e) {
374: // OutOfMemoryError means not enough memory is allocated to the VM.
375: // this is not necessarily at the end of the log file
376: throw Message.convert(e);
377: } catch (Throwable e) {
378: database.getTrace(Trace.LOG).error(
379: "Error reading log file (non-fatal)", e);
380: // TODO log exception, but mark as 'probably ok'
381: // on power loss, sometime there is garbage at the end of the file
382: // we stop recovering in this case (checksum mismatch)
383: }
384: go(pos);
385: }
386:
387: void go(int pos) throws SQLException {
388: file.seek((long) pos * BLOCK_SIZE);
389: }
390:
391: void undo(int pos) throws SQLException {
392: go(pos);
393: redoOrUndo(true, false);
394: }
395:
396: void flush() throws SQLException {
397: if (bufferPos > 0) {
398: if (file == null) {
399: throw Message
400: .getSQLException(ErrorCode.SIMULATED_POWER_OFF);
401: }
402: file.write(buffer, 0, bufferPos);
403: pos = getBlock();
404: for (int i = 0; i < unwritten.size(); i++) {
405: Record r = (Record) unwritten.get(i);
406: r.setLogWritten(id, pos);
407: }
408: unwritten.clear();
409: bufferPos = 0;
410: long min = (long) pos * BLOCK_SIZE;
411: min = MathUtils.scaleUp50Percent(Constants.FILE_MIN_SIZE,
412: min, Constants.FILE_PAGE_SIZE,
413: Constants.FILE_MAX_INCREMENT);
414: if (min > file.length()) {
415: file.setLength(min);
416: }
417: }
418: }
419:
420: void close(boolean delete) throws SQLException {
421: SQLException closeException = null;
422: try {
423: flush();
424: } catch (SQLException e) {
425: closeException = e;
426: }
427: // continue with close even if flush was not possible (file storage problem)
428: if (file != null) {
429: try {
430: file.close();
431: file = null;
432: if (delete) {
433: database.deleteLogFileLater(fileName);
434: }
435: } catch (IOException e) {
436: if (closeException == null) {
437: closeException = Message.convertIOException(e,
438: fileName);
439: }
440: }
441: file = null;
442: fileNamePrefix = null;
443: }
444: if (closeException != null) {
445: throw closeException;
446: }
447: }
448:
449: void addSummary(boolean dataFile, byte[] summary)
450: throws SQLException {
451: DataPage buff = DataPage.create(database, 256);
452: buff.writeInt(0);
453: buff.writeByte((byte) 'S');
454: buff.writeInt(0);
455: buff.writeInt(0); // storageId
456: buff.writeInt(0); // recordId
457: buff.writeInt(0); // blockCount
458: buff.writeByte((byte) (dataFile ? 'D' : 'I'));
459: if (summary == null) {
460: buff.writeInt(0);
461: } else {
462: buff.checkCapacity(summary.length);
463: buff.writeInt(summary.length);
464: buff.write(summary, 0, summary.length);
465: }
466: writeBuffer(buff, null);
467: }
468:
469: void addTruncate(Session session, int storageId, int recordId,
470: int blockCount) throws SQLException {
471: DataPage buff = rowBuff;
472: buff.reset();
473: buff.writeInt(0);
474: buff.writeByte((byte) 'T');
475: buff.writeInt(session.getId());
476: buff.writeInt(storageId);
477: buff.writeInt(recordId);
478: buff.writeInt(blockCount);
479: writeBuffer(buff, null);
480: }
481:
482: void add(Session session, int storageId, Record record)
483: throws SQLException {
484: record.prepareWrite();
485: DataPage buff = rowBuff;
486: buff.reset();
487: buff.writeInt(0);
488: if (record.getDeleted()) {
489: buff.writeByte((byte) 'D');
490: } else {
491: buff.writeByte((byte) 'I');
492: }
493: buff.writeInt(session.getId());
494: buff.writeInt(storageId);
495: buff.writeInt(record.getPos());
496: int blockCount = record.getBlockCount();
497: buff.writeInt(blockCount);
498: buff.checkCapacity(DiskFile.BLOCK_SIZE * blockCount);
499: record.write(buff);
500: writeBuffer(buff, record);
501: }
502:
503: void setFirstUncommittedPos(int firstUncommittedPos)
504: throws SQLException {
505: this .firstUncommittedPos = firstUncommittedPos;
506: int pos = getBlock();
507: writeHeader();
508: go(pos);
509: }
510:
511: int getFirstUncommittedPos() {
512: return firstUncommittedPos;
513: }
514:
515: private void writeHeader() throws SQLException {
516: file.seek(FileStore.HEADER_LENGTH);
517: DataPage buff = getHeader();
518: file.write(buff.getBytes(), 0, buff.length());
519: }
520:
521: void truncate(int pos) throws SQLException {
522: go(pos);
523: file.setLength((long) pos * BLOCK_SIZE);
524: }
525:
526: private DataPage getHeader() {
527: DataPage buff = rowBuff;
528: buff.reset();
529: buff.writeInt(id);
530: buff.writeInt(firstUncommittedPos);
531: // TODO need to update & use firstUnwrittenPos
532: buff.writeInt(firstUnwrittenPos);
533: buff.fill(3 * BLOCK_SIZE);
534: return buff;
535: }
536:
537: private void readHeader() throws SQLException {
538: DataPage buff = getHeader();
539: int len = buff.length();
540: buff.reset();
541: if (file.length() < FileStore.HEADER_LENGTH + len) {
542: // this is an empty file
543: return;
544: }
545: file.readFully(buff.getBytes(), 0, len);
546: id = buff.readInt();
547: firstUncommittedPos = buff.readInt();
548: firstUnwrittenPos = buff.readInt();
549: }
550:
551: int getPos() {
552: return pos;
553: }
554:
555: public long getFileSize() throws SQLException {
556: return file.getFilePointer();
557: }
558:
559: public void sync() {
560: if (file != null) {
561: file.sync();
562: }
563: }
564:
565: void updatePreparedCommit(boolean commit, int pos, int sessionId,
566: int blocks) throws SQLException {
567: synchronized (database) {
568: int posNow = getBlock();
569: DataPage buff = rowBuff;
570: buff.reset();
571: buff.writeInt(blocks);
572: if (commit) {
573: buff.writeByte((byte) 'C');
574: } else {
575: buff.writeByte((byte) 'R');
576: }
577: buff.writeInt(sessionId);
578: buff.fill(blocks * BLOCK_SIZE);
579: buff.updateChecksum();
580: go(pos);
581: file.write(buff.getBytes(), 0, BLOCK_SIZE * blocks);
582: go(posNow);
583: }
584: }
585:
586: }
|