001: /*
002: This source file is part of Smyle, a database library.
003: For up-to-date information, see http://www.drjava.de/smyle
004: Copyright (C) 2001 Stefan Reich (doc@drjava.de)
005:
006: This library is free software; you can redistribute it and/or
007: modify it under the terms of the GNU Lesser General Public
008: License as published by the Free Software Foundation; either
009: version 2.1 of the License, or (at your option) any later version.
010:
011: This library is distributed in the hope that it will be useful,
012: but WITHOUT ANY WARRANTY; without even the implied warranty of
013: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: Lesser General Public License for more details.
015:
016: You should have received a copy of the GNU Lesser General Public
017: License along with this library; if not, write to the Free Software
018: Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019:
020: For full license text, see doc/license/lgpl.txt in this distribution
021: */
022:
023: package drjava.smyle.core;
024:
025: import java.io.*;
026: import java.util.*;
027: import java.lang.Object;
028: import java.lang.ref.*;
029: import org.artsProject.mcop.*;
030: import org.artsProject.mcop.core.*;
031: import org.artsProject.util.*;
032: import drjava.gjutil.*;
033: import drjava.smyle.*;
034: import drjava.smyle.meta.*;
035: import drjava.smyle.core.indexing.*;
036:
037: /** The primary implementation of Store. You don't need to reference this class directly
038: in programs; use the static methods of class Smyle instead. */
039:
040: // IMPORTANT RULE: Don't flush when synchronized on the store!
041: // TODO: follow the rule
042: public class DiskStore implements Store {
043: Disk disk;
044: MasterChunkManager chunkManager;
045: DefaultChunkManager writeChunkManager;
046: ArrayList<WeakReference<SnapshotImpl>> activeSnapshots = new ArrayList<WeakReference<SnapshotImpl>>();
047: int version = VERSION;
048: int references = 1;
049: int timeout = 0;
050: Throwable writeLock;
051: SnapshotImpl writeSnapshot;
052: int waiters = 0;
053: Thread writingThread;
054: PrintWriter logger = new PrintWriter(System.err, true);
055: long gcFrequency = DEFAULT_GC_FREQUENCY;
056: long lastGC;
057: HashMap<String, IndexAdvisor<Pair<IndexProfile, Function>>> indexAdvisors = new HashMap<String, IndexAdvisor<Pair<IndexProfile, Function>>>();
058: SnapshotImpl cachedSnapshot; // an up-to-date immutable snapshot
059: SnapshotImpl unwrittenSnapshot; // modified snapshot that needs to be saved
060: int writeLatency = 0;
061: Thread flusher;
062: boolean deferNextCommit = writeLatency != 0;
063: boolean readOnly;
064:
065: public static boolean debug = false, showGC = true;
066: public static final int VERSION = 890;
067: public static int DEFAULT_GC_FREQUENCY = 100000;
068: static final boolean exclusiveWriteLocking = true; // write locking policy
069:
070: Throwable masterWriteSite;
071:
072: public DiskStore(File dir) {
073: this (new FileSystemDisk(dir, false));
074: }
075:
076: public DiskStore(Disk disk) {
077: this (disk, false);
078: }
079:
080: public DiskStore(Disk disk, boolean readOnly) {
081: try {
082: if (debug)
083: System.out.println("DiskStore init");
084: this .disk = disk;
085: this .readOnly = readOnly;
086:
087: chunkManager = writeChunkManager = new DefaultChunkManager(
088: disk, VERSION);
089:
090: // retrieve store version
091: long master = disk.getMasterFile();
092: if (master != 0) {
093: DataInputStream dis = new DataInputStream(disk
094: .readFile(master));
095: byte[] data = new byte[4];
096: dis.readFully(data);
097: dis.close();
098: version = new Buffer(data).readLong();
099:
100: if (debug)
101: System.out.println("master file=" + master
102: + ", version=" + version + " (VERSION="
103: + VERSION + ")");
104: if (version < VERSION) {
105: log("Upgrading from store version " + version
106: + " to " + VERSION);
107: gcFrequency = 0x4000000000000000L; // disable GC while converting
108: if (version < 25)
109: chunkManager = new ChunkManager02(disk);
110: else if (version < 290)
111: chunkManager = new ChunkManager025(disk);
112: else if (version < 293 && VERSION >= 293)
113: chunkManager = new DefaultChunkManager(disk,
114: version);
115:
116: // snapshot will notice if there are two chunk managers and re-store all chunks
117: try {
118: acquireWriteLock();
119: } catch (InterruptedException e) {
120: throw new InternalSmyleError(e.toString());
121: }
122: SnapshotImpl snapshot = new SnapshotImpl(this ,
123: writeChunkManager, true);
124: snapshot.commit();
125: //snapshot.save(); // why?
126: chunkManager = writeChunkManager;
127: gcFrequency = DEFAULT_GC_FREQUENCY; // re-enable GC
128: }
129:
130: if (debug)
131: System.out.println("Collecting garbage");
132: collectGarbage(false);
133: } else if (readOnly)
134: throw new StoreNotFoundException("No store found in "
135: + disk);
136: } catch (IOException e) {
137: throw new SmyleIOException(e);
138: }
139: }
140:
141: public synchronized void addReference() {
142: ++references;
143: }
144:
145: private void acquireWriteLock() throws InterruptedException {
146: Thread thread = Thread.currentThread();
147: if (writeLock != null || waiters != 0) {
148: if (writingThread == thread) {
149: if (logger != null)
150: writeLock.printStackTrace(logger);
151: throw new MultipleMutableSnapshotsException(
152: "This thread already holds a mutable snapshot - please release before acquiring a new one");
153: }
154: ++waiters;
155: try {
156: if (timeout != 0) {
157: //System.out.println("Thread "+thread+" waiting for "+timeout+" ms");
158: wait(timeout);
159: } else {
160: /*System.out.println("Thread "+thread+" waiting");
161: new Throwable().printStackTrace();*/
162: wait();
163: }
164: } finally {
165: --waiters;
166: }
167: if (writeLock != null) {
168: if (logger != null)
169: writeLock.printStackTrace(logger);
170: throw new TimeoutException(
171: thread
172: + " failed to acquire write lock after waiting "
173: + timeout + " ms");
174: }
175: }
176: writeLock = new Throwable("Current write lock was acquired by "
177: + thread + " here:");
178: writingThread = thread;
179: }
180:
181: private void flush(boolean laterToo) throws InterruptedException {
182: SnapshotImpl s;
183: synchronized (this ) {
184: if (flusher != null) {
185: flusher.interrupt();
186: flusher = null;
187: }
188:
189: if (unwrittenSnapshot != null) {
190: acquireWriteLock();
191: s = unwrittenSnapshot;
192: unwrittenSnapshot = null;
193: } else {
194: if (laterToo && writeLock != null)
195: deferNextCommit = false;
196: return;
197: }
198: }
199: if (s != null)
200: s.save();
201: releaseWriteLock();
202: }
203:
204: synchronized void releaseWriteLock() {
205: cachedSnapshot = null;
206: if (writeSnapshot != null) {
207: if (writeSnapshot.unwritten)
208: unwrittenSnapshot = writeSnapshot;
209: writeSnapshot = null;
210: }
211: writeLock = null;
212: writingThread = null;
213: //System.out.println(Thread.currentThread()+" notify, writeLock="+writeLock);
214: notify();
215: }
216:
217: public synchronized Snapshot mutableSnapshot() {
218: assertOpen();
219: assertWritable();
220:
221: try {
222: acquireWriteLock();
223: } catch (InterruptedException e) {
224: throw new InternalSmyleError(e.toString());
225: }
226: if (unwrittenSnapshot != null) {
227: writeSnapshot = unwrittenSnapshot;
228: writeSnapshot.closed = false;
229: unwrittenSnapshot = null;
230: cachedSnapshot = null;
231: } else
232: writeSnapshot = getSnapshot(true);
233: return writeSnapshot;
234: }
235:
236: private SnapshotImpl getSnapshot(boolean mutable) {
237: SnapshotImpl result = new SnapshotImpl(this , chunkManager,
238: mutable);
239: activeSnapshots.add(new WeakReference<SnapshotImpl>(result));
240: return result;
241: }
242:
243: public synchronized Snapshot snapshot() {
244: assertOpen();
245: return immutableSnapshot();
246: }
247:
248: private SnapshotImpl immutableSnapshot() {
249: try {
250: flush(false);
251: } catch (InterruptedException e) {
252: throw new InternalSmyleError(e.toString());
253: }
254: if (cachedSnapshot == null)
255: return cachedSnapshot = getSnapshot(false);
256: return cachedSnapshot;
257: }
258:
259: public synchronized void close() {
260: assertOpen();
261: if (references == 1) {
262: synchronized (StoreRegistry.class) { // don't want to be reopened while gc runs
263: forgetSnapshots();
264: try {
265: flush(false);
266: } catch (InterruptedException e) {
267: throw new InternalSmyleError(e.toString());
268: }
269: collectGarbage(false);
270:
271: //log(getStats());
272:
273: StoreRegistry.removeStore(this );
274: disk.release();
275: }
276: }
277: --references;
278: }
279:
280: public synchronized String getStats() {
281: return writeChunkManager.getStats();
282: }
283:
284: void assertOpen() throws ClosedStoreException {
285: if (references <= 0)
286: throw new ClosedStoreException("This store has been closed");
287: }
288:
289: void assertWritable() throws ReadOnlyException {
290: if (readOnly)
291: throw new ReadOnlyException(
292: "Store was opened in read-only mode");
293: }
294:
295: /*public void deleteEverything() {
296: disk.deleteEverything();
297: }*/
298:
299: public synchronized void collectGarbage() {
300: collectGarbage(true);
301: }
302:
303: public synchronized void collectGarbage(boolean memGC) {
304: if (readOnly)
305: return;
306: Snapshot snapshot = mutableSnapshot();
307: collectGarbageNoLock(memGC);
308: snapshot.commit();
309: }
310:
311: private void collectGarbageNoLock(boolean memGC) {
312: if (readOnly)
313: return;
314: SnapshotImpl current;
315: ArrayList<WeakReference<SnapshotImpl>> snapshots;
316: synchronized (this ) {
317: current = immutableSnapshot();
318: snapshots = new ArrayList(activeSnapshots);
319: }
320:
321: if (memGC) {
322: // search for any active snapshots that preserve an older state
323: boolean needMemGC = false;
324: for (int i = 0; i < snapshots.size(); i++) {
325: SnapshotImpl snapshot = snapshots.get(i).get();
326: if (snapshot != null && !snapshot.equals(current)) {
327: if (debug)
328: System.out.println("Need mem GC");
329: needMemGC = true;
330: break;
331: }
332: }
333:
334: // free all snapshots that were not forgotten or committed properly
335: if (needMemGC)
336: System.gc();
337: }
338:
339: BitSet whiteList = new BitSet();
340: if (debug)
341: System.out.println("GC: whitelist length for current: "
342: + whiteList.size());
343:
344: synchronized (this ) {
345: if (writingThread != Thread.currentThread())
346: throw new InternalSmyleError("Must have write lock");
347: }
348:
349: for (int i = 0; i < snapshots.size(); i++) {
350: SnapshotImpl snapshot = snapshots.get(i).get();
351: if (snapshot != null)
352: snapshot.collectChunks(whiteList);
353: }
354:
355: //if (debug) System.out.println("GC: disk master="+disk.getMasterFile()+", snapshot master="+current.master+", whitelist length="+whiteList.size());
356: chunkManager.deleteEverythingBut(whiteList);
357: }
358:
359: public synchronized ChunkManager getChunkManager() {
360: return chunkManager;
361: }
362:
363: public synchronized void setTimeout(int ms) {
364: timeout = ms;
365: }
366:
367: public synchronized void logTo(PrintWriter writer) {
368: this .logger = writer;
369: }
370:
371: public synchronized boolean exclusiveWriteLocking() {
372: return exclusiveWriteLocking;
373: }
374:
375: void forgetSnapshot(SnapshotImpl s) {
376: for (int i = 0; i < activeSnapshots.size(); i++) {
377: SnapshotImpl snapshot = activeSnapshots.get(i).get();
378:
379: // use occasion to compact the vector
380: if (snapshot == null || snapshot == s)
381: activeSnapshots.remove(i--);
382: }
383: }
384:
385: void forgetSnapshots() {
386: activeSnapshots.clear();
387: releaseWriteLock();
388: }
389:
390: public synchronized void setGCFrequency(int bytes) {
391: gcFrequency = bytes;
392: }
393:
394: public synchronized void setClusterSize(int bytes) {
395: disk.setClusterSize(bytes);
396: }
397:
398: void maybeGC() {
399: //System.out.println("maybeGC: written="+disk.totalBytesWritten()+", last="+lastGC+", freq="+gcFrequency);
400: boolean needGC = false;
401: synchronized (this ) {
402: if (disk.totalBytesWritten() >= lastGC + gcFrequency) {
403: lastGC = disk.totalBytesWritten();
404: needGC = true;
405: }
406: }
407:
408: if (needGC) {
409: long startTime = System.currentTimeMillis();
410: collectGarbageNoLock(true);
411: long endTime = System.currentTimeMillis();
412:
413: if (showGC)
414: log("GC (" + (endTime - startTime) + " ms)");
415:
416: synchronized (this ) {
417: lastGC = Math.max(lastGC, disk.totalBytesWritten());
418: }
419: }
420: }
421:
422: public synchronized void enableIndexing() {
423: if (indexAdvisors == null) {
424: try {
425: flush(false);
426: } catch (InterruptedException e) {
427: throw new InternalSmyleError(e.toString());
428: }
429: indexAdvisors = new HashMap<String, IndexAdvisor<Pair<IndexProfile, Function>>>();
430: }
431: }
432:
433: synchronized IndexAdvisor<Pair<IndexProfile, Function>> getIndexAdvisor(
434: String table, StructInfo structInfo) {
435: if (indexAdvisors == null)
436: return null; // indexing disabled
437: IndexAdvisor<Pair<IndexProfile, Function>> ia = indexAdvisors
438: .get(table);
439: if (ia == null)
440: indexAdvisors
441: .put(
442: table,
443: ia = new IndexAdvisor<Pair<IndexProfile, Function>>(/*
444: new PairMarDemar<IndexProfile,Function>(new FunctionMarDemar(structInfo))*/));
445: return ia;
446: }
447:
448: synchronized ChunkRef saveMaster(Buffer buffer) {
449: masterWriteSite = new Throwable("Writing master");
450: cachedSnapshot = null;
451: return writeChunkManager.createMasterChunk(buffer);
452: }
453:
454: public synchronized void setWriteLatency(int ms) {
455: writeLatency = ms;
456: deferNextCommit = ms != 0;
457: }
458:
459: synchronized void scheduleFlush() {
460: if (flusher == null) {
461: flusher = new Thread() {
462: public void run() {
463: try {
464: Thread.sleep(writeLatency);
465: synchronized (DiskStore.this ) {
466: if (references <= 0)
467: return;
468: }
469: //System.out.println("Flusher "+this+" flushing");
470: flush(true);
471: } catch (InterruptedException e) {
472: }
473: }
474: };
475: flusher.start();
476: }
477: }
478:
479: public synchronized void kill() {
480: if (flusher != null) {
481: flusher.interrupt();
482: flusher = null;
483: }
484: }
485:
486: synchronized boolean deferThisCommit() {
487: boolean result = deferNextCommit;
488: deferNextCommit = writeLatency != 0;
489: return result;
490: }
491:
492: void log(String msg) {
493: if (logger != null)
494: logger.println("Smyle: " + msg);
495: }
496:
497: public synchronized void optimize() {
498: writeChunkManager.reorderAllChunks();
499: }
500:
501: public synchronized ArrayList<SnapshotImpl> activeSnapshots() {
502: ArrayList<SnapshotImpl> result = new ArrayList<SnapshotImpl>();
503: for (int i = 0; i < activeSnapshots.size(); i++) {
504: SnapshotImpl snapshot = activeSnapshots.get(i).get();
505: if (snapshot != null)
506: result.add(snapshot);
507: }
508: return result;
509: }
510: }
|