001: package com.quadcap.sql.file;
002:
003: /* Copyright 1999 - 2003 Quadcap Software. All rights reserved.
004: *
005: * This software is distributed under the Quadcap Free Software License.
006: * This software may be used or modified for any purpose, personal or
007: * commercial. Open Source redistributions are permitted. Commercial
008: * redistribution of larger works derived from, or works which bundle
009: * this software requires a "Commercial Redistribution License"; see
010: * http://www.quadcap.com/purchase.
011: *
012: * Redistributions qualify as "Open Source" under one of the following terms:
013: *
014: * Redistributions are made at no charge beyond the reasonable cost of
015: * materials and delivery.
016: *
017: * Redistributions are accompanied by a copy of the Source Code or by an
018: * irrevocable offer to provide a copy of the Source Code for up to three
019: * years at the cost of materials and delivery. Such redistributions
020: * must allow further use, modification, and redistribution of the Source
021: * Code under substantially the same terms as this license.
022: *
023: * Redistributions of source code must retain the copyright notices as they
024: * appear in each source code file, these license terms, and the
025: * disclaimer/limitation of liability set forth as paragraph 6 below.
026: *
027: * Redistributions in binary form must reproduce this Copyright Notice,
028: * these license terms, and the disclaimer/limitation of liability set
029: * forth as paragraph 6 below, in the documentation and/or other materials
030: * provided with the distribution.
031: *
032: * The Software is provided on an "AS IS" basis. No warranty is
033: * provided that the Software is free of defects, or fit for a
034: * particular purpose.
035: *
036: * Limitation of Liability. Quadcap Software shall not be liable
037: * for any damages suffered by the Licensee or any third party resulting
038: * from use of the Software.
039: */
040:
041: import java.io.BufferedInputStream;
042: import java.io.EOFException;
043: import java.io.File;
044: import java.io.IOException;
045: import java.io.InputStream;
046: import java.io.RandomAccessFile;
047:
048: import java.util.Properties;
049:
050: import com.quadcap.sql.io.ObjectOutputStream;
051: import com.quadcap.sql.io.ObjectInputStream;
052:
053: import com.quadcap.util.collections.LongMap;
054:
055: import com.quadcap.util.Debug;
056: import com.quadcap.util.Util;
057:
058: /**
059: * A Logger implmeentation using a fixed size circular buffer.
060: *
061: * @author Stan Bailes
062: */
063: public class Logger1 implements Logger {
064: LogBuffer cb;
065: InputStream cbIn = null;
066: ObjectOutputStream oos = null;
067: ByteArrayRandomAccess bra = new ByteArrayRandomAccess();
068: RandomAccessInputStream ris = new RandomAccessInputStream(bra);
069: ObjectInputStream ois = new ObjectInputStream(ris);
070: byte[] tmp = new byte[8];
071:
072: /**
073: * Transaction map:
074: * - ordered by transactionStart
075: * - first transaction is the oldest transaction still running
076: * so when committing the oldest transaction, move the begin
077: * - last transaction is the newest transaction still running
078: * transactions!
079: */
080: TransMap[] trans = new TransMap[16];
081:
082: /**
083: * Keeps track of the number of TransMap entries allocated. Some entries
084: * will correspond to completed but not yet checkpointed transactions,
085: * so this number may be larger than the number of actual live transactions.
086: */
087: int numTrans;
088:
089: /**
090: * Keep the actual count of live transactions as determined by calls
091: * to beginTransaction, endTransaction.
092: */
093: int liveTrans = 0;
094:
095: /**
096: * The starting position of the most recent op we wrote to the file
097: */
098: long prevOp;
099:
100: int maxSize = 128 * 1024 * 1024;
101: int minSize = 128 * 1024;
102:
103: Log myLog;
104: Datafile db;
105:
106: Logger1() {
107: }
108:
109: public void init(Log log, boolean create, Properties props)
110: throws IOException {
111: this .myLog = log;
112: this .db = log.getDatafile();
113: this .cb = new LogBuffer();
114: // XXX better would be a call to Datafile.getProperty("logfile")
115: /*{com.quadcap.sql.Datafile-conn.xml-29}
116: * <config-var>
117: * <config-name>maxLogSize</config-name>
118: * <config-dflt>128 M bytes</config-dflt>
119: * <config-desc>For loggers which support rollback, the maximum
120: * size of the rollback log. Once this maximum is reached, it
121: * may be necessary to abort the oldest transaction and checkpoint
122: * in an attempt to regain log space.</config-desc>
123: * </config-var>
124: */
125: maxSize = Integer.parseInt(props.getProperty("maxLogSize", ""
126: + maxSize));
127:
128: /*{com.quadcap.sql.Datafile-conn.xml-29}
129: * <config-var>
130: * <config-name>minLogSize</config-name>
131: * <config-dflt>128 K bytes</config-dflt>
132: * <config-desc>For loggers which support rollback, the minimum size
133: * at which a logging checkpoint operation will occur. The logging
134: * checkpoint cleans the log of entries which correspond to completed
135: * transactions.</config-desc>
136: * </config-var>
137: */
138: minSize = Integer.parseInt(props.getProperty("minLogSize", ""
139: + minSize));
140:
141: File logfile = new File(db.getDbRootDir(), "logfile");
142: RandomAccessFile raf = new RandomAccessFile(logfile, "rw");
143: FileRandomAccess fra = new FileRandomAccess(raf, maxSize);
144: RandomAccess ra = fra;
145: if (create) {
146: cb.init(ra, maxSize);
147: } else {
148: lastSize = ra.size();
149: cb.init(ra, props);
150: }
151: }
152:
153: public void init(File file) throws IOException {
154: RandomAccessFile raf = new RandomAccessFile(file, "rw");
155: FileRandomAccess fra = new FileRandomAccess(raf, raf.length());
156: RandomAccess ra = fra;
157: this .cb = new LogBuffer();
158: lastSize = ra.size();
159: cb.init(ra, new Properties());
160: }
161:
162: /**
163: * Inner class used to to track active transactions.
164: */
165: class TransMap {
166: long transId;
167: /**
168: * If >0, the position of the first op in the transaction.
169: * This will generally be the BEGIN_TRANSACTION op.
170: */
171: int bufStart;
172: /**
173: * If >0, the position of the most recently written op in the
174: * transaction.
175: */
176: int bufEnd;
177: boolean complete;
178:
179: public void init(long t, int p) {
180: this .transId = t;
181: this .bufStart = p;
182: this .bufEnd = p;
183: this .complete = false;
184: }
185:
186: public void init(TransMap t) {
187: this .transId = t.transId;
188: this .bufStart = -1;
189: this .bufEnd = -1;
190: this .complete = t.complete;
191: }
192:
193: public void copy(TransMap t) {
194: this .transId = t.transId;
195: this .bufStart = t.bufStart;
196: this .bufEnd = t.bufEnd;
197: this .complete = t.complete;
198: }
199:
200: public String toString() {
201: return "TransMap[" + transId + ": " + bufStart + "-"
202: + bufEnd + (complete ? " (COMPLETE)" : "") + "]";
203: }
204: }
205:
206: public String toString() {
207: StringBuffer sb = new StringBuffer("Logger1 {"
208: + cb.getCheckpoint() + ", " + cb.getEnd() + "} ");
209: for (int i = 0; i < trans.length; i++) {
210: TransMap tm = trans[i];
211: if (tm != null && !tm.complete) {
212: sb.append(tm.toString());
213: }
214: }
215: return sb.toString();
216: }
217:
218: public int getCheckpoint() {
219: return cb.getCheckpoint();
220: }
221:
222: public int getEnd() {
223: return cb.getEnd();
224: }
225:
226: public int rput(LogEntry op) throws IOException {
227: int pos = cb.getEnd();
228: op.setPosition(pos);
229:
230: long id = op.getTransactionId();
231: int tx = findTransaction(id);
232: if (tx >= 0) {
233: TransMap tm = trans[tx];
234: op.setPrev(tm.bufEnd);
235: tm.bufEnd = pos;
236: }
237:
238: try {
239: if (oos == null) {
240: oos = new ObjectOutputStream(cb.getOutputStream());
241: }
242: oos.writeObject(op);
243: oos.flush();
244: } catch (IOException ex) {
245: if (tx >= 0) {
246: trans[tx].bufEnd = op.getPrev();
247: }
248: throw ex;
249: }
250: return pos;
251: }
252:
253: public void put(LogEntry op) throws IOException {
254: int pos = rput(op);
255: switch (op.getCode()) {
256: case LogEntry.BEGIN_TRANSACTION:
257: beginTransaction(op.getTransactionId(), pos);
258: break;
259: case LogEntry.COMMIT:
260: endTransaction(op.getTransactionId());
261: break;
262: }
263: //Debug.println("Logger1.put(" + op + ") [" + pos +
264: // "] ----------------------");
265: }
266:
267: public void setRedoState(LogEntry op, int state) throws IOException {
268: int pos = op.getPosition();
269: //Debug.println("setRedoState(" + op + ", " + state + ")");
270: cb.writeByte(pos + 1, (byte) state);
271: op.redoState = state;
272: }
273:
274: public LogEntry getLastOp(long transId) throws IOException {
275: LogEntry ret = null;
276: int tx = findTransaction(transId);
277: if (tx >= 0) {
278: TransMap tm = trans[tx];
279: ret = readEntry(tm.bufEnd);
280: }
281: //#ifdef DEBUG
282: if (ret == null) {
283: Debug.println(Util.stackTrace());
284: Debug.println("getLastOp(" + transId + ") = null, trans:");
285: for (int i = 0; i < numTrans; i++) {
286: Debug.println("trans[" + i + "] = " + trans[i]);
287: }
288: }
289: //#endif
290: return ret;
291: }
292:
293: public LogEntry getPrevOp(LogEntry op) throws IOException {
294: int pos = op.getPrev();
295: int len = op.getPosition() - pos;
296: LogEntry ret = readEntry(pos, len);
297: return ret;
298: }
299:
300: public LogEntry getFirstOp() throws IOException {
301: LogEntry ret = readEntry(cb.getBegin());
302: return ret;
303: }
304:
305: public LogEntry getNextOp() throws IOException {
306: return readEntry();
307: }
308:
309: public void sync() throws IOException {
310: cb.sync();
311: }
312:
313: // close all the files
314: public void close() throws IOException {
315: cb.close();
316: }
317:
318: long lastSize = 0;
319:
320: public void reset() throws IOException {
321: cb.reset();
322: if (lastSize - cb.size() > 100 * 1000) {
323: cb.truncate();
324: }
325: lastSize = cb.size();
326: }
327:
328: public long getOldestTransaction() {
329: if (numTrans > 0) {
330: return trans[0].transId;
331: } else {
332: return -1;
333: }
334: }
335:
336: public int getActiveTransactionCount() {
337: return liveTrans;
338: }
339:
340: public LongMap getActiveTransactions() {
341: LongMap map = new LongMap(16);
342: for (int i = 0; i < numTrans; i++) {
343: TransMap t = trans[i];
344: if (!t.complete) {
345: map.put(t.transId, "");
346: }
347: }
348: return map;
349: }
350:
351: public void checkpoint() throws IOException {
352: int offset = 0;
353: for (int i = 0; i < numTrans; i++) {
354: TransMap t = trans[i];
355: if (t.complete) {
356: offset++;
357: } else if (offset > 0) {
358: trans[i - offset].init(t);
359: }
360: }
361: //#ifdef DEBUG
362: if (Trace.bit(22)) {
363: Debug
364: .println("BEGIN checkpoint [" + cb.getBegin() + "-"
365: + cb.getEnd() + " (" + numTrans
366: + " transactions)]");
367: }
368: //#endif
369: numTrans -= offset;
370: //Debug.println("Checkpoint: -" + offset + " = " + numTrans);
371: LogEntry entry = readEntry(0);
372: cb.reset();
373: if (numTrans > 0) {
374: while (entry != null) {
375: boolean keep = false;
376: long id = entry.getTransactionId();
377: int tx = findTransaction(id);
378: if (tx >= 0) {
379: TransMap tm = trans[tx];
380: keep = !tm.complete;
381: if (keep) {
382: if (tm.bufStart < 0)
383: tm.bufStart = cb.getEnd();
384: }
385: }
386: if (keep) {
387: //#ifdef DEBUG
388: if (Trace.bit(23)) {
389: Debug.println(" [" + cb.getEnd() + "] -> "
390: + entry);
391: }
392: //#endif
393: rput(entry);
394: } else {
395: //#ifdef DEBUG
396: if (Trace.bit(23)) {
397: Debug.println(" [" + cb.getEnd()
398: + "] -> DISCARD: " + entry);
399: }
400: //#endif
401: entry.discard(db);
402: }
403: entry = readEntry();
404: }
405: }
406: cb.checkpoint();
407: //#ifdef DEBUG
408: if (Trace.bit(22)) {
409: Debug
410: .println("END checkpoint [" + cb.getBegin() + "-"
411: + cb.getEnd() + " (" + numTrans
412: + " transactions)]");
413: }
414: //#endif
415: }
416:
417: //--------------------------------- private stuff
418:
419: private final void beginTransaction(long transId, int pos) {
420: if (numTrans >= trans.length) {
421: int newcap = numTrans + (numTrans / 2) + 2;
422: trans = (TransMap[]) Util.checkCapacity(trans, newcap);
423: }
424: TransMap t = trans[numTrans];
425: if (t == null) {
426: t = new TransMap();
427: trans[numTrans] = t;
428: }
429:
430: // A race condition exists -- this might *not* be the newest trans,
431: // and we want to keep the 'trans' array ordered properly, so search
432: // backwards from the end to make sure we put this fellow in the
433: // right place.
434: for (int i = numTrans - 1; i >= 0; i--) {
435: if (trans[i].transId > transId) {
436: trans[i + 1].copy(trans[i]);
437: t = trans[i];
438: } else {
439: break;
440: }
441: }
442: numTrans++;
443: //Debug.println("Begin T:" + transId + " = " + numTrans);
444: liveTrans++;
445: t.init(transId, pos);
446: //Debug.println("beginTransaction(" + transId + "): " + numTrans);
447: }
448:
449: private final void endTransaction(final long transId)
450: throws IOException {
451: //Debug.println("endTransaction(" + transId + "): " + numTrans);
452: int pos = findTransaction(transId);
453: if (pos >= 0) {
454: // Mark the transaction as complete. We don't decrement
455: // numTrans here; we wait until the next checkpoint, where
456: // all of the completed transactions can be reclaimed at
457: // once.
458: trans[pos].complete = true;
459: liveTrans--;
460: if (pos == 0 && cb.size() > minSize) {
461: checkpoint();
462: }
463: }
464: }
465:
466: /**
467: * Find the TransMap entry for the given transaction id
468: */
469: int findTransaction(long transId) {
470: int lo = 0;
471: int hi = numTrans - 1;
472: while (hi >= lo) {
473: int mid = (hi + lo) / 2;
474: TransMap t = trans[mid];
475: if (t.transId == transId) {
476: return mid;
477: }
478: if (t.transId < transId) {
479: lo = mid + 1;
480: } else {
481: hi = mid - 1;
482: }
483: }
484: return -1;
485: }
486:
487: private final LogEntry readEntry(int pos, int len)
488: throws IOException {
489: if (pos < 0)
490: return null;
491: bra.resize(len);
492: byte[] buf = bra.getBytes();
493: cb.read(pos, buf, 0, len);
494: ris.setPosition(0);
495: ois.setInputStream(ris);
496: ois.setPosition(pos);
497:
498: LogEntry ret = readEntry();
499: return ret;
500: }
501:
502: private final LogEntry readEntry(int pos) throws IOException {
503: if (pos < 0)
504: return null;
505: cbIn = new BufferedInputStream(cb.getInputStream(pos));
506: ois.setInputStream(cbIn);
507: ois.setPosition(pos);
508:
509: LogEntry ret = readEntry();
510: return ret;
511: }
512:
513: private final LogEntry readEntry() throws IOException {
514: Object obj = null;
515: try {
516: int pos = (int) ois.getPosition();
517: LogEntry ret = (LogEntry) (obj = ois.readObject());
518: if (ret != null) {
519: ret.setPosition(pos);
520: }
521: return ret;
522: } catch (EOFException ex) {
523: return null;
524: } catch (ClassCastException ez) {
525: throw new DatafileException(ez);
526: } catch (ClassNotFoundException ex) {
527: throw new DatafileException(ex);
528: }
529: }
530: }
|