0001: /**
0002: * com.mckoi.store.JournalledSystem 11 Jun 2003
0003: *
0004: * Mckoi SQL Database ( http://www.mckoi.com/database )
0005: * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
0006: *
0007: * This program is free software; you can redistribute it and/or
0008: * modify it under the terms of the GNU General Public License
0009: * Version 2 as published by the Free Software Foundation.
0010: *
0011: * This program is distributed in the hope that it will be useful,
0012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0014: * GNU General Public License Version 2 for more details.
0015: *
0016: * You should have received a copy of the GNU General Public License
0017: * Version 2 along with this program; if not, write to the Free Software
0018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
0019: *
0020: * Change Log:
0021: *
0022: *
0023: */package com.mckoi.store;
0024:
0025: import java.io.*;
0026: import java.util.HashMap;
0027: import java.util.ArrayList;
0028: import java.util.Collections;
0029: import java.util.Comparator;
0030: import com.mckoi.debug.DebugLogger;
0031: import com.mckoi.debug.Lvl;
0032: import com.mckoi.util.ByteArrayUtil;
0033: import com.mckoi.store.LoggingBufferManager.StoreDataAccessorFactory;
0034:
0035: /**
0036: * Manages a journalling data store management system. All operations are
0037: * written out to a log that can be easily recovered from if a crash occurs.
0038: *
0039: * @author Tobias Downer
0040: */
0041:
0042: class JournalledSystem {
0043:
0044: /**
0045: * Set to true for logging behaviour.
0046: */
0047: private final boolean ENABLE_LOGGING;
0048:
0049: /**
0050: * The path to the journal files.
0051: */
0052: private final File journal_path;
0053:
0054: /**
0055: * If the journal system is in read only mode.
0056: */
0057: private final boolean read_only;
0058:
0059: /**
0060: * The page size.
0061: */
0062: private final int page_size;
0063:
0064: /**
0065: * The map of all resources that are available. (resource_name -> Resource)
0066: */
0067: private HashMap all_resources;
0068:
0069: /**
0070: * The unique sequence id counter for this session.
0071: */
0072: private long seq_id;
0073:
0074: /**
0075: * The archive of journal files currently pending (JournalFile).
0076: */
0077: private final ArrayList journal_archives;
0078:
0079: /**
0080: * The current top journal file.
0081: */
0082: private JournalFile top_journal_file;
0083:
0084: /**
0085: * The current journal file number.
0086: */
0087: private long journal_number;
0088:
0089: /**
0090: * A factory that creates StoreDataAccessor objects used to access the
0091: * resource with the given name.
0092: */
0093: private final StoreDataAccessorFactory sda_factory;
0094:
0095: /**
0096: * Mutex when accessing the top journal file.
0097: */
0098: private final Object top_journal_lock = new Object();
0099:
0100: /**
0101: * A thread that runs in the background and persists information that is in
0102: * the journal.
0103: */
0104: private JournalingThread journaling_thread;
0105:
0106: /**
0107: * A debug log to output information to.
0108: */
0109: private final DebugLogger debug;
0110:
0111: JournalledSystem(File journal_path, boolean read_only,
0112: int page_size, StoreDataAccessorFactory sda_factory,
0113: DebugLogger debug, boolean enable_logging) {
0114: this .journal_path = journal_path;
0115: this .read_only = read_only;
0116: this .page_size = page_size;
0117: this .sda_factory = sda_factory;
0118: all_resources = new HashMap();
0119: journal_number = 0;
0120: journal_archives = new ArrayList();
0121: this .debug = debug;
0122: this .ENABLE_LOGGING = enable_logging;
0123: }
0124:
0125: /**
0126: * Returns a journal file name with the given number. The journal number
0127: * must be between 10 and 63
0128: */
0129: private static String getJournalFileName(int number) {
0130: if (number < 10 || number > 73) {
0131: throw new Error("Journal file name out of range.");
0132: }
0133: return "jnl" + number;
0134: }
0135:
0136: // Lock used during initialization
0137: private final Object init_lock = new Object();
0138:
0139: /**
0140: * Starts the journal system.
0141: */
0142: void start() throws IOException {
0143: if (ENABLE_LOGGING) {
0144: synchronized (init_lock) {
0145: if (journaling_thread == null) {
0146: // Start the background journaling thread,
0147: journaling_thread = new JournalingThread();
0148: journaling_thread.start();
0149: // Scan for any changes and make the changes.
0150: rollForwardRecover();
0151: if (!read_only) {
0152: // Create a new top journal file
0153: newTopJournalFile();
0154: }
0155: } else {
0156: throw new Error(
0157: "Assertion failed - already started.");
0158: }
0159: }
0160: }
0161: }
0162:
0163: /**
0164: * Stops the journal system. This will persist any pending changes up to the
0165: * last check point and then finish.
0166: */
0167: void stop() throws IOException {
0168: if (ENABLE_LOGGING) {
0169: synchronized (init_lock) {
0170: if (journaling_thread != null) {
0171: // Stop the journal thread
0172: journaling_thread.persistArchives(0);
0173: journaling_thread.finish();
0174: journaling_thread.waitUntilFinished();
0175: journaling_thread = null;
0176: } else {
0177: throw new Error(
0178: "Assertion failed - already stopped.");
0179: }
0180: }
0181:
0182: if (!read_only) {
0183: // Close any remaining journals and roll forward recover (shouldn't
0184: // actually be necessary but just incase...)
0185: synchronized (top_journal_lock) {
0186: // Close all the journals
0187: int sz = journal_archives.size();
0188: for (int i = 0; i < sz; ++i) {
0189: JournalFile jf = (JournalFile) journal_archives
0190: .get(i);
0191: jf.close();
0192: }
0193: // Close the top journal
0194: topJournal().close();
0195: // Scan for journals and make the changes.
0196: rollForwardRecover();
0197: }
0198: }
0199:
0200: }
0201: }
0202:
0203: /**
0204: * Recovers any lost operations that are currently in the journal. This
0205: * retries all logged entries. This would typically be called before any
0206: * other IO operations.
0207: */
0208: void rollForwardRecover() throws IOException {
0209: // System.out.println("rollForwardRecover()");
0210:
0211: // The list of all journal files,
0212: ArrayList journal_files_list = new ArrayList();
0213:
0214: // Scan the journal path for any journal files.
0215: for (int i = 10; i < 74; ++i) {
0216: String journal_fn = getJournalFileName(i);
0217: File f = new File(journal_path, journal_fn);
0218: // If the journal exists, create a summary of the journal
0219: if (f.exists()) {
0220: if (read_only) {
0221: throw new IOException(
0222: "Journal file "
0223: + f
0224: + " exists for a read-only session. "
0225: + "There may not be any pending journals for a read-only session.");
0226: }
0227:
0228: JournalFile jf = new JournalFile(f, read_only);
0229: // Open the journal file for recovery. This will set various
0230: // information about the journal such as the last check point and the
0231: // id of the journal file.
0232: JournalSummary summary = jf.openForRecovery();
0233: // If the journal can be recovered from.
0234: if (summary.can_be_recovered) {
0235: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0236: debug.write(Lvl.INFORMATION, this , "Journal "
0237: + jf + " found - can be recovered.");
0238: }
0239: journal_files_list.add(summary);
0240: } else {
0241: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0242: debug.write(Lvl.INFORMATION, this , "Journal "
0243: + jf
0244: + " deleting - nothing to recover.");
0245: }
0246: // Otherwise close and delete it
0247: jf.closeAndDelete();
0248: }
0249: }
0250: }
0251:
0252: // if (journal_files_list.size() == 0) {
0253: // System.out.println("Nothing to recover.");
0254: // }
0255:
0256: // Sort the journal file list from oldest to newest. The oldest journals
0257: // are recovered first.
0258: Collections.sort(journal_files_list, journal_list_comparator);
0259:
0260: long last_journal_number = -1;
0261:
0262: // Persist the journals
0263: for (int i = 0; i < journal_files_list.size(); ++i) {
0264: JournalSummary summary = (JournalSummary) journal_files_list
0265: .get(i);
0266:
0267: // Check the resources for this summary
0268: ArrayList res_list = summary.resource_list;
0269: for (int n = 0; n < res_list.size(); ++n) {
0270: String resource_name = (String) res_list.get(n);
0271: // This puts the resource into the hash map.
0272: JournalledResource resource = createResource(resource_name);
0273: }
0274:
0275: // Assert that we are recovering the journals in the correct order
0276: JournalFile jf = summary.journal_file;
0277: if (jf.journal_number < last_journal_number) {
0278: throw new Error("Assertion failed, sort failed.");
0279: }
0280: last_journal_number = jf.journal_number;
0281:
0282: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0283: debug.write(Lvl.INFORMATION, this , "Recovering: " + jf
0284: + " (8 .. " + summary.last_checkpoint + ")");
0285: }
0286:
0287: jf.persist(8, summary.last_checkpoint);
0288: // Then close and delete.
0289: jf.closeAndDelete();
0290:
0291: // Check the resources for this summary and close them
0292: for (int n = 0; n < res_list.size(); ++n) {
0293: String resource_name = (String) res_list.get(n);
0294: AbstractResource resource = (AbstractResource) createResource(resource_name);
0295: // When we finished, make sure the resource is closed again
0296: // Close the resource
0297: resource.persistClose();
0298: // Post recover notification
0299: resource.notifyPostRecover();
0300: }
0301:
0302: }
0303:
0304: }
0305:
0306: private Comparator journal_list_comparator = new Comparator() {
0307:
0308: public int compare(Object ob1, Object ob2) {
0309: JournalSummary js1 = (JournalSummary) ob1;
0310: JournalSummary js2 = (JournalSummary) ob2;
0311:
0312: long jn1 = js1.journal_file.getJournalNumber();
0313: long jn2 = js2.journal_file.getJournalNumber();
0314:
0315: if (jn1 > jn2) {
0316: return 1;
0317: } else if (jn1 < jn2) {
0318: return -1;
0319: } else {
0320: return 0;
0321: }
0322: }
0323:
0324: };
0325:
0326: /**
0327: * Creates a new top journal file.
0328: */
0329: private void newTopJournalFile() throws IOException {
0330: // // Move the old journal to the archive?
0331: // if (top_journal_file != null) {
0332: // journal_archives.add(top_journal_file);
0333: // }
0334:
0335: String journal_fn = getJournalFileName((int) ((journal_number & 63) + 10));
0336: ++journal_number;
0337:
0338: File f = new File(journal_path, journal_fn);
0339: if (f.exists()) {
0340: throw new IOException("Journal file already exists.");
0341: }
0342:
0343: top_journal_file = new JournalFile(f, read_only);
0344: top_journal_file.open(journal_number - 1);
0345: }
0346:
0347: /**
0348: * Returns the current top journal file.
0349: */
0350: private JournalFile topJournal() {
0351: synchronized (top_journal_lock) {
0352: return top_journal_file;
0353: }
0354: }
0355:
0356: /**
0357: * Creates a resource.
0358: */
0359: public JournalledResource createResource(String resource_name) {
0360: AbstractResource resource;
0361: synchronized (all_resources) {
0362: // Has this resource previously been open?
0363: resource = (AbstractResource) all_resources
0364: .get(resource_name);
0365: if (resource == null) {
0366: // No...
0367: // Create a unique id for this
0368: final long id = seq_id;
0369: ++seq_id;
0370: // Create the StoreDataAccessor for this resource.
0371: StoreDataAccessor accessor = sda_factory
0372: .createStoreDataAccessor(resource_name);
0373: if (ENABLE_LOGGING) {
0374: resource = new Resource(resource_name, id, accessor);
0375: } else {
0376: resource = new NonLoggingResource(resource_name,
0377: id, accessor);
0378: }
0379: // Put this in the map.
0380: all_resources.put(resource_name, resource);
0381: }
0382: }
0383:
0384: // Return the resource
0385: return resource;
0386: }
0387:
0388: /**
0389: * Sets a check point in the log. If 'flush_journals' is true then when the
0390: * method returns we are guarenteed that all the journals are flushed and the
0391: * data is absolutely current. If 'flush_journals' is false then we can't
0392: * assume the journals will be empty when the method returns.
0393: */
0394: void setCheckPoint(boolean flush_journals) throws IOException {
0395: // No Logging
0396: if (!ENABLE_LOGGING) {
0397: return;
0398: }
0399: // Return if read-only
0400: if (read_only) {
0401: return;
0402: }
0403:
0404: boolean something_to_persist;
0405:
0406: synchronized (top_journal_lock) {
0407: JournalFile top_j = topJournal();
0408:
0409: // When the journal exceeds a threshold then we cycle the top journal
0410: if (flush_journals || top_j.size() > (256 * 1024)) {
0411: // Cycle to the next journal file
0412: newTopJournalFile();
0413: // Add this to the archives
0414: journal_archives.add(top_j);
0415: }
0416: something_to_persist = journal_archives.size() > 0;
0417: top_j.setCheckPoint();
0418: }
0419:
0420: if (something_to_persist) {
0421: // Notifies the background thread that there is something to persist.
0422: // This will block until there are at most 10 journal files open.
0423: journaling_thread.persistArchives(10);
0424: }
0425:
0426: }
0427:
0428: /**
0429: * Returns the Resource with the given name.
0430: */
0431: private AbstractResource getResource(String resource_name) {
0432: synchronized (all_resources) {
0433: return (AbstractResource) all_resources.get(resource_name);
0434: }
0435: }
0436:
0437: // ---------- Inner classes ----------
0438:
0439: /**
0440: * A JournalFile represents a file in which modification are logged out to
0441: * when changes are made. A JournalFile contains instructions for rebuilding
0442: * a resource to a known stable state.
0443: */
0444: private final class JournalFile {
0445:
0446: /**
0447: * The File object of this journal in the file system.
0448: */
0449: private File file;
0450:
0451: /**
0452: * True if the journal file is read only.
0453: */
0454: private boolean read_only;
0455:
0456: /**
0457: * The StreamFile object for reading and writing entries to/from the
0458: * journal.
0459: */
0460: private StreamFile data;
0461:
0462: /**
0463: * A DataOutputStream object used to write entries to the journal file.
0464: */
0465: private DataOutputStream data_out;
0466:
0467: /**
0468: * Small buffer.
0469: */
0470: private byte[] buffer;
0471:
0472: /**
0473: * A map between a resource name and an id for this journal file.
0474: */
0475: private HashMap resource_id_map;
0476:
0477: /**
0478: * The sequence id for resources modified in this log.
0479: */
0480: private long cur_seq_id;
0481:
0482: /**
0483: * The journal number of this journal.
0484: */
0485: private long journal_number;
0486:
0487: /**
0488: * True when open.
0489: */
0490: private boolean is_open;
0491:
0492: /**
0493: * The number of threads currently looking at info in this journal.
0494: */
0495: private int reference_count;
0496:
0497: /**
0498: * Constructs the journal file.
0499: */
0500: public JournalFile(File file, boolean read_only) {
0501: this .file = file;
0502: this .read_only = read_only;
0503: this .is_open = false;
0504: buffer = new byte[36];
0505: resource_id_map = new HashMap();
0506: cur_seq_id = 0;
0507: reference_count = 1;
0508: }
0509:
0510: /**
0511: * Returns the size of the journal file in bytes.
0512: */
0513: long size() {
0514: return data.length();
0515: }
0516:
0517: /**
0518: * Returns the journal number assigned to this journal.
0519: */
0520: long getJournalNumber() {
0521: return journal_number;
0522: }
0523:
0524: /**
0525: * Opens the journal file. If the journal file exists then an error is
0526: * generated.
0527: */
0528: void open(long journal_number) throws IOException {
0529: if (is_open) {
0530: throw new IOException("Journal file is already open.");
0531: }
0532: if (file.exists()) {
0533: throw new IOException("Journal file already exists.");
0534: }
0535:
0536: this .journal_number = journal_number;
0537: data = new StreamFile(file, read_only ? "r" : "rw");
0538: data_out = new DataOutputStream(new BufferedOutputStream(
0539: data.getOutputStream()));
0540: data_out.writeLong(journal_number);
0541: is_open = true;
0542: }
0543:
0544: /**
0545: * Opens the journal for recovery. This scans the journal and generates
0546: * some statistics about the journal file such as the last check point and
0547: * the journal number. If the journal file doesn't exist then an error is
0548: * generated.
0549: */
0550: JournalSummary openForRecovery() throws IOException {
0551: if (is_open) {
0552: throw new IOException("Journal file is already open.");
0553: }
0554: if (!file.exists()) {
0555: throw new IOException("Journal file does not exists.");
0556: }
0557:
0558: // Open the random access file to this journal
0559: data = new StreamFile(file, read_only ? "r" : "rw");
0560: // data_out = new DataOutputStream(
0561: // new BufferedOutputStream(data.getOutputStream()));
0562:
0563: is_open = true;
0564:
0565: // Create the summary object (by default, not recoverable).
0566: JournalSummary summary = new JournalSummary(this );
0567:
0568: long end_pointer = data.length();
0569:
0570: // If end_pointer < 8 then can't recovert this journal
0571: if (end_pointer < 8) {
0572: return summary;
0573: }
0574:
0575: // The input stream.
0576: final DataInputStream din = new DataInputStream(
0577: new BufferedInputStream(data.getInputStream()));
0578:
0579: try {
0580: // Set the journal number for this
0581: this .journal_number = din.readLong();
0582: long position = 8;
0583:
0584: ArrayList checkpoint_res_list = new ArrayList();
0585:
0586: // Start scan
0587: while (true) {
0588:
0589: // If we can't read 12 bytes ahead, return the summary
0590: if (position + 12 > end_pointer) {
0591: return summary;
0592: }
0593:
0594: long type = din.readLong();
0595: int size = din.readInt();
0596:
0597: // System.out.println("Scan: " + type + " pos=" + position + " size=" + size);
0598: position = position + size + 12;
0599:
0600: boolean skip_body = true;
0601:
0602: // If checkpoint reached then we are recoverable
0603: if (type == 100) {
0604: summary.last_checkpoint = position;
0605: summary.can_be_recovered = true;
0606:
0607: // Add the resources in this check point
0608: summary.resource_list
0609: .addAll(checkpoint_res_list);
0610: // And clear the temporary list.
0611: checkpoint_res_list.clear();
0612:
0613: }
0614:
0615: // If end reached, or type is not understood then return
0616: else if (position >= end_pointer || type < 1
0617: || type > 7) {
0618: return summary;
0619: }
0620:
0621: // If we are resource type, then load the resource
0622: if (type == 2) {
0623:
0624: // We don't skip body for this type, we read the content
0625: skip_body = false;
0626: long id = din.readLong();
0627: int str_len = din.readInt();
0628: StringBuffer str = new StringBuffer(str_len);
0629: for (int i = 0; i < str_len; ++i) {
0630: str.append(din.readChar());
0631: }
0632:
0633: String resource_name = new String(str);
0634: checkpoint_res_list.add(resource_name);
0635:
0636: }
0637:
0638: if (skip_body) {
0639: int to_skip = size;
0640: while (to_skip > 0) {
0641: to_skip -= din.skip(to_skip);
0642: }
0643: }
0644:
0645: }
0646:
0647: } finally {
0648: din.close();
0649: }
0650:
0651: }
0652:
0653: /**
0654: * Closes the journal file.
0655: */
0656: void close() throws IOException {
0657: synchronized (this ) {
0658: if (!is_open) {
0659: throw new IOException(
0660: "Journal file is already closed.");
0661: }
0662:
0663: data.close();
0664: data = null;
0665: is_open = false;
0666: }
0667: }
0668:
0669: /**
0670: * Returns true if the journal is deleted.
0671: */
0672: boolean isDeleted() {
0673: synchronized (this ) {
0674: return data == null;
0675: }
0676: }
0677:
0678: /**
0679: * Closes and deletes the journal file. This may not immediately close and
0680: * delete the journal file if there are currently references to it (for
0681: * example, in the middle of a read operation).
0682: */
0683: void closeAndDelete() throws IOException {
0684: synchronized (this ) {
0685: --reference_count;
0686: if (reference_count == 0) {
0687: // Close and delete the journal file.
0688: close();
0689: boolean b = file.delete();
0690: if (!b) {
0691: System.out
0692: .println("Unable to delete journal file: "
0693: + file);
0694: }
0695: }
0696: }
0697: }
0698:
0699: /**
0700: * Adds a reference preventing the journal file from being deleted.
0701: */
0702: void addReference() {
0703: synchronized (this ) {
0704: if (reference_count != 0) {
0705: ++reference_count;
0706: }
0707: }
0708: }
0709:
0710: /**
0711: * Removes a reference, if we are at the last reference the journal file is
0712: * deleted.
0713: */
0714: void removeReference() throws IOException {
0715: closeAndDelete();
0716: }
0717:
0718: /**
0719: * Plays the log from the given offset in the file to the next checkpoint.
0720: * This will actually persist the log. Returns -1 if the end of the journal
0721: * is reached.
0722: * <p>
0723: * NOTE: This will not verify that the journal is correct. Verification
0724: * should be done before the persist.
0725: */
0726: void persist(final long start, final long end)
0727: throws IOException {
0728:
0729: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0730: debug.write(Lvl.INFORMATION, this , "Persisting: "
0731: + file);
0732: }
0733:
0734: final DataInputStream din = new DataInputStream(
0735: new BufferedInputStream(data.getInputStream()));
0736: long count = start;
0737: // Skip to the offset
0738: while (count > 0) {
0739: count -= din.skip(count);
0740: }
0741:
0742: // The list of resources we updated
0743: ArrayList resources_updated = new ArrayList();
0744:
0745: // A map from resource id to resource name for this journal.
0746: HashMap id_name_map = new HashMap();
0747:
0748: boolean finished = false;
0749: long position = start;
0750:
0751: while (!finished) {
0752: long type = din.readLong();
0753: int size = din.readInt();
0754: position = position + size + 12;
0755:
0756: if (type == 2) { // Resource id tag
0757: long id = din.readLong();
0758: int len = din.readInt();
0759: StringBuffer buf = new StringBuffer(len);
0760: for (int i = 0; i < len; ++i) {
0761: buf.append(din.readChar());
0762: }
0763: String resource_name = new String(buf);
0764:
0765: // Put this in the map
0766: id_name_map.put(new Long(id), resource_name);
0767:
0768: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0769: debug.write(Lvl.INFORMATION, this ,
0770: "Journal Command: Tag: " + id + " = "
0771: + resource_name);
0772: }
0773:
0774: // Add this to the list of resources we updated.
0775: resources_updated.add(getResource(resource_name));
0776:
0777: } else if (type == 6) { // Resource delete
0778: long id = din.readLong();
0779: String resource_name = (String) id_name_map
0780: .get(new Long(id));
0781: AbstractResource resource = getResource(resource_name);
0782:
0783: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0784: debug.write(Lvl.INFORMATION, this ,
0785: "Journal Command: Delete: "
0786: + resource_name);
0787: }
0788:
0789: resource.persistDelete();
0790:
0791: } else if (type == 3) { // Resource size change
0792: long id = din.readLong();
0793: long new_size = din.readLong();
0794: String resource_name = (String) id_name_map
0795: .get(new Long(id));
0796: AbstractResource resource = getResource(resource_name);
0797:
0798: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0799: debug.write(Lvl.INFORMATION, this ,
0800: "Journal Command: Set Size: "
0801: + resource_name + " size = "
0802: + new_size);
0803: }
0804:
0805: resource.persistSetSize(new_size);
0806:
0807: } else if (type == 1) { // Page modification
0808: long id = din.readLong();
0809: long page = din.readLong();
0810: int off = din.readInt();
0811: int len = din.readInt();
0812:
0813: String resource_name = (String) id_name_map
0814: .get(new Long(id));
0815: AbstractResource resource = getResource(resource_name);
0816:
0817: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0818: debug.write(Lvl.INFORMATION, this ,
0819: "Journal Command: Page Modify: "
0820: + resource_name + " page = "
0821: + page + " off = " + off
0822: + " len = " + len);
0823: }
0824:
0825: resource.persistPageChange(page, off, len, din);
0826:
0827: } else if (type == 100) { // Checkpoint (end)
0828:
0829: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0830: debug.write(Lvl.INFORMATION, this ,
0831: "Journal Command: Check Point.");
0832: }
0833:
0834: if (position == end) {
0835: finished = true;
0836: }
0837: }
0838:
0839: else {
0840: throw new Error("Unknown tag type: " + type
0841: + " position = " + position);
0842: }
0843:
0844: } // while (!finished)
0845:
0846: // Synch all the resources that we have updated.
0847: int sz = resources_updated.size();
0848: for (int i = 0; i < sz; ++i) {
0849: AbstractResource r = (AbstractResource) resources_updated
0850: .get(i);
0851: if (debug.isInterestedIn(Lvl.INFORMATION)) {
0852: debug.write(Lvl.INFORMATION, this , "Synch: " + r);
0853: }
0854: r.synch();
0855: }
0856:
0857: din.close();
0858:
0859: }
0860:
0861: /**
0862: * Writes a resource identifier to the stream for the resource with the
0863: * given name.
0864: */
0865: private Long writeResourceName(String resource_name,
0866: DataOutputStream out) throws IOException {
0867: Long v;
0868: synchronized (resource_id_map) {
0869: v = (Long) resource_id_map.get(resource_name);
0870: if (v == null) {
0871: ++cur_seq_id;
0872:
0873: int len = resource_name.length();
0874:
0875: // Write the header for this resource
0876: out.writeLong(2);
0877: out.writeInt(8 + 4 + (len * 2));
0878: out.writeLong(cur_seq_id);
0879: out.writeInt(len);
0880: out.writeChars(resource_name);
0881:
0882: // Put this id in the cache
0883: v = new Long(cur_seq_id);
0884: resource_id_map.put(resource_name, v);
0885: }
0886: }
0887:
0888: return v;
0889: }
0890:
0891: /**
0892: * Logs that a resource was deleted.
0893: */
0894: void logResourceDelete(String resource_name) throws IOException {
0895:
0896: synchronized (this ) {
0897: // Build the header,
0898: Long v = writeResourceName(resource_name, data_out);
0899:
0900: // Write the header
0901: long resource_id = v.longValue();
0902: data_out.writeLong(6);
0903: data_out.writeInt(8);
0904: data_out.writeLong(resource_id);
0905:
0906: }
0907:
0908: }
0909:
0910: /**
0911: * Logs a resource size change.
0912: */
0913: void logResourceSizeChange(String resource_name, long new_size)
0914: throws IOException {
0915: synchronized (this ) {
0916: // Build the header,
0917: Long v = writeResourceName(resource_name, data_out);
0918:
0919: // Write the header
0920: long resource_id = v.longValue();
0921: data_out.writeLong(3);
0922: data_out.writeInt(8 + 8);
0923: data_out.writeLong(resource_id);
0924: data_out.writeLong(new_size);
0925:
0926: }
0927:
0928: }
0929:
0930: /**
0931: * Sets a check point. This will add an entry to the log.
0932: */
0933: void setCheckPoint() throws IOException {
0934: synchronized (this ) {
0935:
0936: data_out.writeLong(100);
0937: data_out.writeInt(0);
0938:
0939: // Flush and synch the journal file
0940: flushAndSynch();
0941: }
0942: }
0943:
0944: /**
0945: * Logs a page modification to the end of the log and returns a pointer
0946: * in the file to the modification.
0947: */
0948: JournalEntry logPageModification(String resource_name,
0949: long page_number, byte[] buf, int off, int len)
0950: throws IOException {
0951:
0952: long ref;
0953: synchronized (this ) {
0954: // Build the header,
0955: Long v = writeResourceName(resource_name, data_out);
0956:
0957: // The absolute position of the page,
0958: final long absolute_position = page_number * page_size;
0959:
0960: // Write the header
0961: long resource_id = v.longValue();
0962: data_out.writeLong(1);
0963: data_out.writeInt(8 + 8 + 4 + 4 + len);
0964: data_out.writeLong(resource_id);
0965: // data_out.writeLong(page_number);
0966: // data_out.writeInt(off);
0967: data_out.writeLong(absolute_position / 8192);
0968: data_out.writeInt(off
0969: + (int) (absolute_position & 8191));
0970: data_out.writeInt(len);
0971:
0972: data_out.write(buf, off, len);
0973:
0974: // Flush the changes so we can work out the pointer.
0975: data_out.flush();
0976: ref = data.length() - len - 36;
0977: }
0978:
0979: // Returns a JournalEntry object
0980: return new JournalEntry(resource_name, this , ref,
0981: page_number);
0982: }
0983:
0984: /**
0985: * Reconstructs a modification that is logged in this journal.
0986: */
0987: void buildPage(long in_page_number, long position, byte[] buf,
0988: int off) throws IOException {
0989: long type;
0990: long resource_id;
0991: long page_number;
0992: int page_offset;
0993: int page_length;
0994:
0995: synchronized (this ) {
0996: data.readFully(position, buffer, 0, 36);
0997: type = ByteArrayUtil.getLong(buffer, 0);
0998: resource_id = ByteArrayUtil.getLong(buffer, 12);
0999: page_number = ByteArrayUtil.getLong(buffer, 20);
1000: page_offset = ByteArrayUtil.getInt(buffer, 28);
1001: page_length = ByteArrayUtil.getInt(buffer, 32);
1002:
1003: // Some asserts,
1004: if (type != 1) {
1005: throw new IOException("Invalid page type. type = "
1006: + type + " pos = " + position);
1007: }
1008: if (page_number != in_page_number) {
1009: throw new IOException("Page numbers do not match.");
1010: }
1011:
1012: // Read the content.
1013: data.readFully(position + 36, buf, off + page_offset,
1014: page_length);
1015: }
1016:
1017: }
1018:
1019: /**
1020: * Synchronizes the log.
1021: */
1022: void flushAndSynch() throws IOException {
1023: synchronized (this ) {
1024: data_out.flush();
1025: data.synch();
1026: }
1027: }
1028:
1029: public String toString() {
1030: return "[JOURNAL: " + file.getName() + "]";
1031: }
1032:
1033: }
1034:
1035: /**
1036: * A JournalEntry represents a modification that has been logging in the
1037: * journal for a specific page of a resource. It contains the name of the
1038: * log file, the position in the journal of the modification, and the page
1039: * number.
1040: */
1041: private static final class JournalEntry {
1042:
1043: /**
1044: * The resource that this page is on.
1045: */
1046: private final String resource_name;
1047:
1048: /**
1049: * The journal file.
1050: */
1051: private final JournalFile journal;
1052:
1053: /**
1054: * The position in the journal file.
1055: */
1056: private final long position;
1057:
1058: /**
1059: * The page number of this modification.
1060: */
1061: private final long page_number;
1062:
1063: /**
1064: * The next journal entry with the same page number
1065: */
1066: JournalEntry next_page;
1067:
1068: /**
1069: * Constructs the entry.
1070: */
1071: public JournalEntry(String resource_name, JournalFile journal,
1072: long position, long page_number) {
1073: this .resource_name = resource_name;
1074: this .journal = journal;
1075: this .position = position;
1076: this .page_number = page_number;
1077: }
1078:
1079: /**
1080: * Returns the journal file for this entry.
1081: */
1082: public JournalFile getJournalFile() {
1083: return journal;
1084: }
1085:
1086: /**
1087: * Returns the position of the log entry in the journal file.
1088: */
1089: public long getPosition() {
1090: return position;
1091: }
1092:
1093: /**
1094: * Returns the page number of this modification log entry.
1095: */
1096: public long getPageNumber() {
1097: return page_number;
1098: }
1099:
1100: }
1101:
1102: /**
1103: * An abstract resource.
1104: */
1105: private abstract class AbstractResource implements
1106: JournalledResource {
1107:
1108: /**
1109: * The unique name given this resource (the file name).
1110: */
1111: protected final String name;
1112:
1113: /**
1114: * The id assigned to this resource by this session. This id should not
1115: * be used in any external source.
1116: */
1117: protected final long id;
1118:
1119: /**
1120: * The backing object.
1121: */
1122: protected final StoreDataAccessor data;
1123:
1124: /**
1125: * True if this resource is read_only.
1126: */
1127: protected boolean read_only;
1128:
1129: /**
1130: * Constructs the resource.
1131: */
1132: AbstractResource(String name, long id, StoreDataAccessor data) {
1133: this .name = name;
1134: this .id = id;
1135: this .data = data;
1136: }
1137:
1138: // ---------- Persist methods ----------
1139:
1140: abstract void persistClose() throws IOException;
1141:
1142: abstract void persistDelete() throws IOException;
1143:
1144: abstract void persistSetSize(final long new_size)
1145: throws IOException;
1146:
1147: abstract void persistPageChange(final long page, final int off,
1148: int len, DataInputStream din) throws IOException;
1149:
1150: abstract void synch() throws IOException;
1151:
1152: // Called after a rollForwardRecover to notify the resource to update its
1153: // state to reflect the fact that changes have occurred.
1154: abstract void notifyPostRecover();
1155:
1156: // ----------
1157:
1158: /**
1159: * Returns the size of the page.
1160: */
1161: public int getPageSize() {
1162: return page_size;
1163: }
1164:
1165: /**
1166: * Returns the unique id of this page.
1167: */
1168: public long getID() {
1169: return id;
1170: }
1171:
1172: public String toString() {
1173: return name;
1174: }
1175:
1176: }
1177:
1178: /**
1179: * An implementation of AbstractResource that doesn't log.
1180: */
1181: private final class NonLoggingResource extends AbstractResource {
1182:
1183: /**
1184: * Constructs the resource.
1185: */
1186: NonLoggingResource(String name, long id, StoreDataAccessor data) {
1187: super (name, id, data);
1188: }
1189:
1190: // ---------- Persist methods ----------
1191:
1192: void persistClose() throws IOException {
1193: // No-op
1194: }
1195:
1196: public void persistDelete() throws IOException {
1197: // No-op
1198: }
1199:
1200: public void persistSetSize(final long new_size)
1201: throws IOException {
1202: // No-op
1203: }
1204:
1205: public void persistPageChange(final long page, final int off,
1206: int len, DataInputStream din) throws IOException {
1207: // No-op
1208: }
1209:
1210: public void synch() throws IOException {
1211: data.synch();
1212: }
1213:
1214: public void notifyPostRecover() {
1215: // No-op
1216: }
1217:
1218: // ----------
1219:
1220: /**
1221: * Opens the resource.
1222: */
1223: public void open(boolean read_only) throws IOException {
1224: this .read_only = read_only;
1225: data.open(read_only);
1226: }
1227:
1228: /**
1229: * Reads a page from the resource.
1230: */
1231: public void read(final long page_number, final byte[] buf,
1232: final int off) throws IOException {
1233: // Read the data.
1234: long page_position = page_number * page_size;
1235: data.read(page_position + off, buf, off, page_size);
1236: }
1237:
1238: /**
1239: * Writes a page of some previously specified size.
1240: */
1241: public void write(final long page_number, byte[] buf, int off,
1242: int len) throws IOException {
1243: long page_position = page_number * page_size;
1244: data.write(page_position + off, buf, off, len);
1245: }
1246:
1247: /**
1248: * Sets the size of the resource.
1249: */
1250: public void setSize(long size) throws IOException {
1251: data.setSize(size);
1252: }
1253:
1254: /**
1255: * Returns the size of this resource.
1256: */
1257: public long getSize() throws IOException {
1258: return data.getSize();
1259: }
1260:
1261: /**
1262: * Closes the resource.
1263: */
1264: public void close() throws IOException {
1265: data.close();
1266: }
1267:
1268: /**
1269: * Deletes the resource.
1270: */
1271: public void delete() throws IOException {
1272: data.delete();
1273: }
1274:
1275: /**
1276: * Returns true if the resource currently exists.
1277: */
1278: public boolean exists() {
1279: return data.exists();
1280: }
1281:
1282: }
1283:
1284: /**
1285: * Represents a resource in this system. A resource is backed by a
1286: * StoreDataAccessor and may have one or more modifications to it in the
1287: * journal.
1288: */
1289: private final class Resource extends AbstractResource {
1290:
1291: /**
1292: * The size of the resource.
1293: */
1294: private long size;
1295:
1296: /**
1297: * True if there is actually data to be read in the above object.
1298: */
1299: private boolean there_is_backing_data;
1300:
1301: /**
1302: * True if the underlying resource is really open.
1303: */
1304: private boolean really_open;
1305:
1306: /**
1307: * True if the data store exists.
1308: */
1309: private boolean data_exists;
1310:
1311: /**
1312: * True if the data resource is open.
1313: */
1314: private boolean data_open;
1315:
1316: /**
1317: * True if the data resource was deleted.
1318: */
1319: private boolean data_deleted;
1320:
1321: /**
1322: * The hash of all journal entries on this resource (JournalEntry).
1323: */
1324: private final JournalEntry[] journal_map;
1325:
1326: /**
1327: * A temporary buffer the size of a page.
1328: */
1329: private final byte[] page_buffer;
1330:
1331: /**
1332: * Constructs the resource.
1333: */
1334: Resource(String name, long id, StoreDataAccessor data) {
1335: super (name, id, data);
1336: journal_map = new JournalEntry[257];
1337: data_open = false;
1338: data_exists = data.exists();
1339: data_deleted = false;
1340: if (data_exists) {
1341: try {
1342: size = data.getSize();
1343: // System.out.println("Setting size of " + name + " to " + size);
1344: } catch (IOException e) {
1345: throw new Error("Error getting size of resource: "
1346: + e.getMessage());
1347: }
1348: }
1349: really_open = false;
1350: page_buffer = new byte[page_size];
1351: }
1352:
1353: // ---------- Persist methods ----------
1354:
1355: private void persistOpen(boolean read_only) throws IOException {
1356: // System.out.println(name + " Open");
1357: if (!really_open) {
1358: data.open(read_only);
1359: there_is_backing_data = true;
1360: really_open = true;
1361: }
1362: }
1363:
1364: void persistClose() throws IOException {
1365: // System.out.println(name + " Close");
1366: if (really_open) {
1367: // When we close we reset the size attribute. We do this because of
1368: // the roll forward recovery.
1369: size = data.getSize();
1370: data.synch();
1371: data.close();
1372: really_open = false;
1373: }
1374: }
1375:
1376: public void persistDelete() throws IOException {
1377: // System.out.println(name + " Delete");
1378: // If open then close
1379: if (really_open) {
1380: persistClose();
1381: }
1382: data.delete();
1383: there_is_backing_data = false;
1384: }
1385:
1386: public void persistSetSize(final long new_size)
1387: throws IOException {
1388: // System.out.println(name + " Set Size " + size);
1389: // If not open then open.
1390: if (!really_open) {
1391: persistOpen(false);
1392: }
1393: // Don't let us set a size that's smaller than the current size.
1394: if (new_size > data.getSize()) {
1395: data.setSize(new_size);
1396: }
1397: }
1398:
1399: public void persistPageChange(final long page, final int off,
1400: int len, DataInputStream din) throws IOException {
1401: if (!really_open) {
1402: persistOpen(false);
1403: }
1404:
1405: // Buffer to read the page content into
1406: byte[] buf;
1407: if (len <= page_buffer.length) {
1408: // If length is smaller or equal to the size of a page then use the
1409: // local page buffer.
1410: buf = page_buffer;
1411: } else {
1412: // Otherwise create a new buffer of the required size (this may happen
1413: // if the page size changes between sessions).
1414: buf = new byte[len];
1415: }
1416:
1417: // Read the change from the input stream
1418: din.readFully(buf, 0, len);
1419: // Write the change out to the underlying resource container
1420: long pos = page * 8192; //page_size;
1421: data.write(pos + off, buf, 0, len);
1422: }
1423:
1424: public void synch() throws IOException {
1425: if (really_open) {
1426: data.synch();
1427: }
1428: }
1429:
1430: public void notifyPostRecover() {
1431: data_exists = data.exists();
1432: }
1433:
1434: // ----------
1435:
1436: /**
1437: * Opens the resource. This method will check if the resource exists. If
1438: * it doesn't exist the 'read' method will return just the journal
1439: * modifications of a page. If it does exist it opens the resource and uses
1440: * that as the backing to any 'read' operations.
1441: */
1442: public void open(boolean read_only) throws IOException {
1443: this .read_only = read_only;
1444:
1445: if (!data_deleted && data.exists()) {
1446: // It does exist so open it.
1447: persistOpen(read_only);
1448: } else {
1449: there_is_backing_data = false;
1450: data_deleted = false;
1451: }
1452: data_open = true;
1453: data_exists = true;
1454: }
1455:
1456: /**
1457: * Reads a page from the resource. This method reconstructs the page
1458: * from the underlying data, and from any journal entries. This should
1459: * read the data to be put into a buffer in memory.
1460: */
1461: public void read(final long page_number, final byte[] buf,
1462: final int off) throws IOException {
1463:
1464: synchronized (journal_map) {
1465: if (!data_open) {
1466: throw new IOException(
1467: "Assertion failed: Data file is not open.");
1468: }
1469: }
1470:
1471: // The list of all journal entries on this page number
1472: final ArrayList all_journal_entries = new ArrayList(4);
1473: try {
1474: // The map index.
1475: synchronized (journal_map) {
1476: int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length);
1477: JournalEntry entry = (JournalEntry) journal_map[i];
1478: JournalEntry prev = null;
1479:
1480: while (entry != null) {
1481: boolean deleted_hash = false;
1482:
1483: JournalFile file = entry.getJournalFile();
1484: // Note that once we have a reference the journal file can not be
1485: // deleted.
1486: file.addReference();
1487:
1488: // If the file is closed (or deleted)
1489: if (file.isDeleted()) {
1490: deleted_hash = true;
1491: // Deleted so remove the reference to the journal
1492: file.removeReference();
1493: // Remove the journal entry from the chain.
1494: if (prev == null) {
1495: journal_map[i] = entry.next_page;
1496: } else {
1497: prev.next_page = entry.next_page;
1498: }
1499: }
1500: // Else if not closed then is this entry the page number?
1501: else if (entry.getPageNumber() == page_number) {
1502: all_journal_entries.add(entry);
1503: } else {
1504: // Not the page we are looking for so remove the reference to the
1505: // file.
1506: file.removeReference();
1507: }
1508:
1509: // Only move prev is we have NOT deleted a hash entry
1510: if (!deleted_hash) {
1511: prev = entry;
1512: }
1513: entry = entry.next_page;
1514: }
1515: }
1516:
1517: // Read any data from the underlying file
1518: if (there_is_backing_data) {
1519: long page_position = page_number * page_size;
1520: // First read the page from the underlying store.
1521: data.read(page_position, buf, off, page_size);
1522: } else {
1523: // Clear the buffer
1524: for (int i = off; i < (page_size + off); ++i) {
1525: buf[i] = 0;
1526: }
1527: }
1528:
1529: // Rebuild from the journal file(s)
1530: final int sz = all_journal_entries.size();
1531: for (int i = 0; i < sz; ++i) {
1532: JournalEntry entry = (JournalEntry) all_journal_entries
1533: .get(i);
1534: JournalFile file = entry.getJournalFile();
1535: final long position = entry.getPosition();
1536: synchronized (file) {
1537: file.buildPage(page_number, position, buf, off);
1538: }
1539: }
1540:
1541: } finally {
1542:
1543: // Make sure we remove the reference for all the journal files.
1544: final int sz = all_journal_entries.size();
1545: for (int i = 0; i < sz; ++i) {
1546: JournalEntry entry = (JournalEntry) all_journal_entries
1547: .get(i);
1548: JournalFile file = entry.getJournalFile();
1549: file.removeReference();
1550: }
1551:
1552: }
1553:
1554: }
1555:
1556: /**
1557: * Writes a page of some previously specified size to the top log. This
1558: * will add a single entry to the log and any 'read' operations after will
1559: * contain the written data.
1560: */
1561: public void write(final long page_number, byte[] buf, int off,
1562: int len) throws IOException {
1563:
1564: synchronized (journal_map) {
1565: if (!data_open) {
1566: throw new IOException(
1567: "Assertion failed: Data file is not open.");
1568: }
1569:
1570: // Make this modification in the log
1571: JournalEntry journal;
1572: synchronized (top_journal_lock) {
1573: journal = topJournal().logPageModification(name,
1574: page_number, buf, off, len);
1575: }
1576:
1577: // This adds the modification to the END of the hash list. This means
1578: // when we reconstruct the page the journals will always be in the
1579: // correct order - from oldest to newest.
1580:
1581: // The map index.
1582: int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length);
1583: JournalEntry entry = (JournalEntry) journal_map[i];
1584: // Make sure this entry is added to the END
1585: if (entry == null) {
1586: // Add at the head if no first entry
1587: journal_map[i] = journal;
1588: journal.next_page = null;
1589: } else {
1590: // Otherwise search to the end
1591: // The number of journal entries in the linked list
1592: int journal_entry_count = 0;
1593: while (entry.next_page != null) {
1594: entry = entry.next_page;
1595: ++journal_entry_count;
1596: }
1597: // and add to the end
1598: entry.next_page = journal;
1599: journal.next_page = null;
1600:
1601: // If there are over 35 journal entries, scan and remove all entries
1602: // on journals that have persisted
1603: if (journal_entry_count > 35) {
1604: int entries_cleaned = 0;
1605: entry = (JournalEntry) journal_map[i];
1606: JournalEntry prev = null;
1607:
1608: while (entry != null) {
1609: boolean deleted_hash = false;
1610:
1611: JournalFile file = entry.getJournalFile();
1612: // Note that once we have a reference the journal file can not be
1613: // deleted.
1614: file.addReference();
1615:
1616: // If the file is closed (or deleted)
1617: if (file.isDeleted()) {
1618: deleted_hash = true;
1619: // Deleted so remove the reference to the journal
1620: file.removeReference();
1621: // Remove the journal entry from the chain.
1622: if (prev == null) {
1623: journal_map[i] = entry.next_page;
1624: } else {
1625: prev.next_page = entry.next_page;
1626: }
1627: ++entries_cleaned;
1628: }
1629: // Remove the reference
1630: file.removeReference();
1631:
1632: // Only move prev is we have NOT deleted a hash entry
1633: if (!deleted_hash) {
1634: prev = entry;
1635: }
1636: entry = entry.next_page;
1637: }
1638:
1639: }
1640: }
1641: }
1642:
1643: }
1644:
1645: /**
1646: * Sets the size of the resource.
1647: */
1648: public void setSize(long size) throws IOException {
1649: synchronized (journal_map) {
1650: this .size = size;
1651: }
1652: synchronized (top_journal_lock) {
1653: topJournal().logResourceSizeChange(name, size);
1654: }
1655: }
1656:
1657: /**
1658: * Returns the size of this resource.
1659: */
1660: public long getSize() throws IOException {
1661: synchronized (journal_map) {
1662: return this .size;
1663: }
1664: }
1665:
1666: /**
1667: * Closes the resource. This will actually simply log that the resource
1668: * has been closed.
1669: */
1670: public void close() throws IOException {
1671: synchronized (journal_map) {
1672: data_open = false;
1673: }
1674: }
1675:
1676: /**
1677: * Deletes the resource. This will actually simply log that the resource
1678: * has been deleted.
1679: */
1680: public void delete() throws IOException {
1681: // Log that this resource was deleted.
1682: synchronized (top_journal_lock) {
1683: topJournal().logResourceDelete(name);
1684: }
1685: synchronized (journal_map) {
1686: data_exists = false;
1687: data_deleted = true;
1688: size = 0;
1689: }
1690: }
1691:
1692: /**
1693: * Returns true if the resource currently exists.
1694: */
1695: public boolean exists() {
1696: return data_exists;
1697: }
1698:
1699: }
1700:
1701: /**
1702: * Summary information about a journal.
1703: */
1704: private static class JournalSummary {
1705:
1706: /**
1707: * The JournalFile object that is a summary of.
1708: */
1709: JournalFile journal_file;
1710:
1711: /**
1712: * True if the journal is recoverable (has one or more complete check
1713: * points available).
1714: */
1715: boolean can_be_recovered = false;
1716:
1717: /**
1718: * The position of the last checkpoint in the journal.
1719: */
1720: long last_checkpoint;
1721:
1722: /**
1723: * The list of all resource names that this journal 'touches'.
1724: */
1725: ArrayList resource_list = new ArrayList();
1726:
1727: /**
1728: * Constructor.
1729: */
1730: public JournalSummary(JournalFile journal_file) {
1731: this .journal_file = journal_file;
1732: }
1733:
1734: }
1735:
1736: /**
1737: * Thread that persists the journal in the backgroudn.
1738: */
1739: private class JournalingThread extends Thread {
1740:
1741: private boolean finished = false;
1742: private boolean actually_finished;
1743:
1744: /**
1745: * Constructor.
1746: */
1747: JournalingThread() {
1748: setName("Mckoi - Background Journaling");
1749: // This is a daemon thread. it should be safe if this thread
1750: // dies at any time.
1751: setDaemon(true);
1752: }
1753:
1754: public void run() {
1755: boolean local_finished = false;
1756:
1757: while (!local_finished) {
1758:
1759: ArrayList to_process = null;
1760: synchronized (top_journal_lock) {
1761: if (journal_archives.size() > 0) {
1762: to_process = new ArrayList();
1763: to_process.addAll(journal_archives);
1764: }
1765: }
1766:
1767: if (to_process == null) {
1768: // Nothing to process so wait
1769: synchronized (this ) {
1770: if (!finished) {
1771: try {
1772: wait();
1773: } catch (InterruptedException e) { /* ignore */
1774: }
1775: }
1776: }
1777:
1778: } else if (to_process.size() > 0) {
1779: // Something to process, so go ahead and process the journals,
1780: int sz = to_process.size();
1781: // For all journals
1782: for (int i = 0; i < sz; ++i) {
1783: // Pick the lowest journal to persist
1784: JournalFile jf = (JournalFile) to_process
1785: .get(i);
1786: try {
1787: // Persist the journal
1788: jf.persist(8, jf.size());
1789: // Close and then delete the journal file
1790: jf.closeAndDelete();
1791: } catch (IOException e) {
1792: debug.write(Lvl.ERROR, this ,
1793: "Error persisting journal: " + jf);
1794: debug.writeException(Lvl.ERROR, e);
1795: // If there is an error persisting the best thing to do is
1796: // finish
1797: synchronized (this ) {
1798: finished = true;
1799: }
1800: }
1801: }
1802: }
1803:
1804: synchronized (this ) {
1805: local_finished = finished;
1806: // Remove the journals that we have just persisted.
1807: if (to_process != null) {
1808: synchronized (top_journal_lock) {
1809: int sz = to_process.size();
1810: for (int i = 0; i < sz; ++i) {
1811: journal_archives.remove(0);
1812: }
1813: }
1814: }
1815: // Notify any threads waiting
1816: notifyAll();
1817: }
1818:
1819: }
1820:
1821: synchronized (this ) {
1822: actually_finished = true;
1823: notifyAll();
1824: }
1825: }
1826:
1827: public synchronized void finish() {
1828: finished = true;
1829: notifyAll();
1830: }
1831:
1832: public synchronized void waitUntilFinished() {
1833: try {
1834: while (!actually_finished) {
1835: wait();
1836: }
1837: } catch (InterruptedException e) {
1838: throw new Error("Interrupted: " + e.getMessage());
1839: }
1840: }
1841:
1842: /**
1843: * Persists the journal_archives list until the list is at least the
1844: * given size.
1845: */
1846: public synchronized void persistArchives(int until_size) {
1847: notifyAll();
1848: int sz;
1849: synchronized (top_journal_lock) {
1850: sz = journal_archives.size();
1851: }
1852: // Wait until the sz is smaller than 'until_size'
1853: while (sz > until_size) {
1854: try {
1855: wait();
1856: } catch (InterruptedException e) { /* ignore */
1857: }
1858:
1859: synchronized (top_journal_lock) {
1860: sz = journal_archives.size();
1861: }
1862: }
1863: }
1864:
1865: }
1866:
1867: }
|