001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 1997-2007.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.memory;
007:
008: import java.io.File;
009: import java.io.IOException;
010: import java.util.IdentityHashMap;
011: import java.util.LinkedHashSet;
012: import java.util.Set;
013: import java.util.Timer;
014: import java.util.TimerTask;
015:
016: import info.aduna.concurrent.locks.ExclusiveLockManager;
017: import info.aduna.concurrent.locks.Lock;
018: import info.aduna.concurrent.locks.ReadPrefReadWriteLockManager;
019: import info.aduna.concurrent.locks.ReadWriteLockManager;
020: import info.aduna.iteration.CloseableIteration;
021: import info.aduna.iteration.EmptyIteration;
022:
023: import org.openrdf.model.Resource;
024: import org.openrdf.model.Statement;
025: import org.openrdf.model.URI;
026: import org.openrdf.model.Value;
027: import org.openrdf.sail.SailConnection;
028: import org.openrdf.sail.SailException;
029: import org.openrdf.sail.helpers.DefaultSailChangedEvent;
030: import org.openrdf.sail.helpers.SailBase;
031: import org.openrdf.sail.memory.model.MemResource;
032: import org.openrdf.sail.memory.model.MemStatement;
033: import org.openrdf.sail.memory.model.MemStatementIterator;
034: import org.openrdf.sail.memory.model.MemStatementList;
035: import org.openrdf.sail.memory.model.MemURI;
036: import org.openrdf.sail.memory.model.MemValue;
037: import org.openrdf.sail.memory.model.MemValueFactory;
038: import org.openrdf.sail.memory.model.ReadMode;
039: import org.openrdf.sail.memory.model.TxnStatus;
040:
041: /**
042: * An implementation of the Sail interface that stores its data in main memory
043: * and that can use a file for persistent storage. This Sail implementation
044: * supports single, isolated transactions. This means that changes to the data
045: * are not visible until a transaction is committed and that concurrent
046: * transactions are not possible. When another transaction is active, calls to
047: * <tt>startTransaction()</tt> will block until the active transaction is
048: * committed or rolled back.
049: *
050: * @author Arjohn Kampman
051: * @author jeen
052: */
053: public class MemoryStore extends SailBase {
054:
055: /*-----------*
056: * Constants *
057: *-----------*/
058:
059: protected static final String DATA_FILE_NAME = "memorystore.data";
060:
061: /*-----------*
062: * Variables *
063: *-----------*/
064:
065: /**
066: * Factory/cache for MemValue objects.
067: */
068: private MemValueFactory valueFactory;
069:
070: /**
071: * List containing all available statements.
072: */
073: private MemStatementList statements;
074:
075: /**
076: * Set of all statements that have been affected by a transaction.
077: */
078: private IdentityHashMap<MemStatement, MemStatement> txnStatements;
079:
080: /**
081: * Identifies the current snapshot.
082: */
083: private int currentSnapshot;
084:
085: /**
086: * Store for namespace prefix info.
087: */
088: private MemNamespaceStore namespaceStore;
089:
090: /**
091: * Lock manager used to give the snapshot cleanup thread exclusive access to
092: * the statement list.
093: */
094: private ReadWriteLockManager statementListLockManager;
095:
096: /**
097: * Lock manager used to prevent concurrent transactions.
098: */
099: private ExclusiveLockManager txnLockManager;
100:
101: /**
102: * Flag indicating whether the Sail has been initialized.
103: */
104: private boolean initialized = false;
105:
106: private boolean persist = false;
107:
108: /**
109: * The file used for data persistence, null if this is a volatile RDF store.
110: */
111: private File dataFile;
112:
113: /**
114: * Flag indicating whether the contents of this repository have changed.
115: */
116: private boolean contentsChanged;
117:
118: /**
119: * The sync delay.
120: *
121: * @see #setSyncDelay
122: */
123: private long syncDelay = 0L;
124:
125: /**
126: * Semaphore used to synchronize concurrent access to {@link #sync()}.
127: */
128: private final Object syncSemaphore = new Object();
129:
130: /**
131: * The timer used to trigger file synchronization.
132: */
133: private Timer syncTimer;
134:
135: /**
136: * The currently scheduled timer task, if any.
137: */
138: private TimerTask syncTimerTask;
139:
140: /**
141: * Semaphore used to synchronize concurrent access to {@link #syncTimer} and
142: * {@link #syncTimerTask}.
143: */
144: private final Object syncTimerSemaphore = new Object();
145:
146: /**
147: * Cleanup thread that removes deprecated statements when no other threads
148: * are accessing this list. Seee {@link #scheduleSnapshotCleanup()}.
149: */
150: private Thread snapshotCleanupThread;
151:
152: /**
153: * Semaphore used to synchronize concurrent access to
154: * {@link #snapshotCleanupThread}.
155: */
156: private final Object snapshotCleanupThreadSemaphore = new Object();
157:
158: private boolean trackLocks = false;
159:
160: /*--------------*
161: * Constructors *
162: *--------------*/
163:
164: /**
165: * Creates a new MemoryStore.
166: */
167: public MemoryStore() {
168: }
169:
170: /**
171: * Creates a new persistent MemoryStore. If the specified data directory
172: * contains an existing store, its contents will be restored upon
173: * initialization.
174: *
175: * @param dataDir
176: * the data directory to be used for persistence.
177: */
178: public MemoryStore(File dataDir) {
179: setDataDir(dataDir);
180: setPersist(true);
181: }
182:
183: /*---------*
184: * Methods *
185: *---------*/
186:
187: @Override
188: public void setDataDir(File dataDir) {
189: if (isInitialized()) {
190: throw new IllegalStateException(
191: "sail has already been initialized");
192: }
193:
194: super .setDataDir(dataDir);
195: }
196:
197: public void setPersist(boolean persist) {
198: if (isInitialized()) {
199: throw new IllegalStateException(
200: "sail has already been initialized");
201: }
202:
203: this .persist = persist;
204: }
205:
206: public boolean getPersist() {
207: return persist;
208: }
209:
210: /**
211: * Sets the time (in milliseconds) to wait after a transaction was commited
212: * before writing the changed data to file. Setting this variable to 0 will
213: * force a file sync immediately after each commit. A negative value will
214: * deactivate file synchronization until the Sail is shut down. A positive
215: * value will postpone the synchronization for at least that amount of
216: * milliseconds. If in the meantime a new transaction is started, the file
217: * synchronization will be rescheduled to wait for another <tt>syncDelay</tt>
218: * ms. This way, bursts of transaction events can be combined in one file
219: * sync.
220: * <p>
221: * The default value for this parameter is <tt>0</tt> (immediate
222: * synchronization).
223: *
224: * @param syncDelay
225: * The sync delay in milliseconds.
226: */
227: public void setSyncDelay(long syncDelay) {
228: if (isInitialized()) {
229: throw new IllegalStateException(
230: "sail has already been initialized");
231: }
232:
233: this .syncDelay = syncDelay;
234: }
235:
236: /**
237: * Gets the currently configured sync delay.
238: *
239: * @return syncDelay The sync delay in milliseconds.
240: * @see #setSyncDelay
241: */
242: public long getSyncDelay() {
243: return syncDelay;
244: }
245:
246: /**
247: * Initializes this repository. If a persistence file is defined for the
248: * store, the contents will be restored.
249: *
250: * @throws SailException
251: * when initialization of the store failed.
252: */
253: public void initialize() throws SailException {
254: if (isInitialized()) {
255: throw new IllegalStateException(
256: "sail has already been intialized");
257: }
258:
259: logger.debug("Initializing MemoryStore...");
260:
261: statementListLockManager = new ReadPrefReadWriteLockManager(
262: trackLocks);
263: txnLockManager = new ExclusiveLockManager(trackLocks);
264: namespaceStore = new MemNamespaceStore();
265:
266: valueFactory = new MemValueFactory();
267: statements = new MemStatementList(256);
268: currentSnapshot = 1;
269:
270: if (persist) {
271: dataFile = new File(getDataDir(), DATA_FILE_NAME);
272:
273: if (dataFile.exists()) {
274: logger.debug("Reading data from {}...", dataFile);
275:
276: // Initialize persistent store from file
277: if (!dataFile.canRead()) {
278: logger.error("Data file is not readable: {}",
279: dataFile);
280: throw new SailException("Can't read data file: "
281: + dataFile);
282: }
283: // Don't try to read empty files: this will result in an
284: // IOException, and the file doesn't contain any data anyway.
285: if (dataFile.length() == 0L) {
286: logger.warn("Ignoring empty data file: {}",
287: dataFile);
288: } else {
289: try {
290: FileIO.read(this , dataFile);
291: logger.debug("Data file read successfully");
292: } catch (IOException e) {
293: logger.error("Failed to read data file", e);
294: throw new SailException(e);
295: }
296: }
297: } else {
298: // file specified that does not exist yet, create it
299: try {
300: File dir = dataFile.getParentFile();
301: if (dir != null && !dir.exists()) {
302: logger
303: .debug("Creating directory for data file...");
304: if (!dir.mkdirs()) {
305: logger
306: .debug(
307: "Failed to create directory for data file: {}",
308: dir);
309: throw new SailException(
310: "Failed to create directory for data file: "
311: + dir);
312: }
313: }
314:
315: logger.debug("Initializing data file...");
316: FileIO.write(this , dataFile);
317: logger.debug("Data file initialized");
318: } catch (IOException e) {
319: logger.debug("Failed to initialize data file", e);
320: throw new SailException(
321: "Failed to initialize data file "
322: + dataFile, e);
323: } catch (SailException e) {
324: logger.debug("Failed to initialize data file", e);
325: throw new SailException(
326: "Failed to initialize data file "
327: + dataFile, e);
328: }
329: }
330: }
331:
332: contentsChanged = false;
333: initialized = true;
334:
335: logger.debug("MemoryStore initialized");
336: }
337:
338: /**
339: * Checks whether the Sail has been initialized.
340: *
341: * @return <tt>true</tt> if the Sail has been initialized, <tt>false</tt>
342: * otherwise.
343: */
344: protected final boolean isInitialized() {
345: return initialized;
346: }
347:
348: @Override
349: protected void shutDownInternal() throws SailException {
350: if (isInitialized()) {
351: Lock stLock = getStatementsReadLock();
352:
353: try {
354: cancelSyncTimer();
355: sync();
356:
357: valueFactory = null;
358: statements = null;
359: dataFile = null;
360: initialized = false;
361: } finally {
362: stLock.release();
363: }
364: }
365: }
366:
367: /**
368: * Checks whether this Sail object is writable. A MemoryStore is not writable
369: * if a read-only data file is used.
370: */
371: public boolean isWritable() {
372: // Sail is not writable when it has a data file that is not writable
373: return dataFile == null || dataFile.canWrite();
374: }
375:
376: @Override
377: protected SailConnection getConnectionInternal()
378: throws SailException {
379: if (!isInitialized()) {
380: throw new IllegalStateException("sail not initialized.");
381: }
382:
383: return new MemoryStoreConnection(this );
384: }
385:
386: public MemValueFactory getValueFactory() {
387: if (valueFactory == null) {
388: throw new IllegalStateException("sail not initialized.");
389: }
390:
391: return valueFactory;
392: }
393:
394: protected MemNamespaceStore getNamespaceStore() {
395: return namespaceStore;
396: }
397:
398: protected MemStatementList getStatements() {
399: return statements;
400: }
401:
402: protected int getCurrentSnapshot() {
403: return currentSnapshot;
404: }
405:
406: protected Lock getStatementsReadLock() throws SailException {
407: try {
408: return statementListLockManager.getReadLock();
409: } catch (InterruptedException e) {
410: throw new SailException(e);
411: }
412: }
413:
414: protected Lock getTransactionLock() throws SailException {
415: try {
416: return txnLockManager.getExclusiveLock();
417: } catch (InterruptedException e) {
418: throw new SailException(e);
419: }
420: }
421:
422: protected int size() {
423: return statements.size();
424: }
425:
426: /**
427: * Creates a StatementIterator that contains the statements matching the
428: * specified pattern of subject, predicate, object, context. Inferred
429: * statements are excluded when <tt>explicitOnly</tt> is set to
430: * <tt>true</tt>. Statements from the null context are excluded when
431: * <tt>namedContextsOnly</tt> is set to <tt>true</tt>. The returned
432: * StatementIterator will assume the specified read mode.
433: */
434: protected <X extends Exception> CloseableIteration<MemStatement, X> createStatementIterator(
435: Class<X> excClass, Resource subj, URI pred, Value obj,
436: boolean explicitOnly, int snapshot, ReadMode readMode,
437: Resource... contexts) {
438: // Perform look-ups for value-equivalents of the specified values
439: MemResource memSubj = valueFactory.getMemResource(subj);
440: if (subj != null && memSubj == null) {
441: // non-existent subject
442: return new EmptyIteration<MemStatement, X>();
443: }
444:
445: MemURI memPred = valueFactory.getMemURI(pred);
446: if (pred != null && memPred == null) {
447: // non-existent predicate
448: return new EmptyIteration<MemStatement, X>();
449: }
450:
451: MemValue memObj = valueFactory.getMemValue(obj);
452: if (obj != null && memObj == null) {
453: // non-existent object
454: return new EmptyIteration<MemStatement, X>();
455: }
456:
457: MemResource[] memContexts;
458: MemStatementList smallestList;
459:
460: if (contexts.length == 0) {
461: memContexts = new MemResource[0];
462: smallestList = statements;
463: } else if (contexts.length == 1 && contexts[0] != null) {
464: MemResource memContext = valueFactory
465: .getMemResource(contexts[0]);
466: if (memContext == null) {
467: // non-existent context
468: return new EmptyIteration<MemStatement, X>();
469: }
470:
471: memContexts = new MemResource[] { memContext };
472: smallestList = memContext.getContextStatementList();
473: } else {
474: Set<MemResource> contextSet = new LinkedHashSet<MemResource>(
475: 2 * contexts.length);
476:
477: for (Resource context : contexts) {
478: MemResource memContext = valueFactory
479: .getMemResource(context);
480: if (context == null || memContext != null) {
481: contextSet.add(memContext);
482: }
483: }
484:
485: if (contextSet.isEmpty()) {
486: // no known contexts specified
487: return new EmptyIteration<MemStatement, X>();
488: }
489:
490: memContexts = contextSet.toArray(new MemResource[contextSet
491: .size()]);
492: smallestList = statements;
493: }
494:
495: if (memSubj != null) {
496: MemStatementList l = memSubj.getSubjectStatementList();
497: if (l.size() < smallestList.size()) {
498: smallestList = l;
499: }
500: }
501:
502: if (memPred != null) {
503: MemStatementList l = memPred.getPredicateStatementList();
504: if (l.size() < smallestList.size()) {
505: smallestList = l;
506: }
507: }
508:
509: if (memObj != null) {
510: MemStatementList l = memObj.getObjectStatementList();
511: if (l.size() < smallestList.size()) {
512: smallestList = l;
513: }
514: }
515:
516: return new MemStatementIterator<X>(smallestList, memSubj,
517: memPred, memObj, explicitOnly, snapshot, readMode,
518: memContexts);
519: }
520:
521: protected Statement addStatement(Resource subj, URI pred,
522: Value obj, Resource context, boolean explicit)
523: throws SailException {
524: boolean newValueCreated = false;
525:
526: // Get or create MemValues for the operands
527: MemResource memSubj = valueFactory.getMemResource(subj);
528: if (memSubj == null) {
529: memSubj = valueFactory.createMemResource(subj);
530: newValueCreated = true;
531: }
532: MemURI memPred = valueFactory.getMemURI(pred);
533: if (memPred == null) {
534: memPred = valueFactory.createMemURI(pred);
535: newValueCreated = true;
536: }
537: MemValue memObj = valueFactory.getMemValue(obj);
538: if (memObj == null) {
539: memObj = valueFactory.createMemValue(obj);
540: newValueCreated = true;
541: }
542: MemResource memContext = valueFactory.getMemResource(context);
543: if (context != null && memContext == null) {
544: memContext = valueFactory.createMemResource(context);
545: newValueCreated = true;
546: }
547:
548: if (!newValueCreated) {
549: // All values were already present in the graph. Possibly, the
550: // statement is already present. Check this.
551: CloseableIteration<MemStatement, SailException> stIter = createStatementIterator(
552: SailException.class, memSubj, memPred, memObj,
553: false, currentSnapshot + 1, ReadMode.RAW,
554: memContext);
555:
556: try {
557: if (stIter.hasNext()) {
558: // statement is already present, update its transaction
559: // status if appropriate
560: MemStatement st = stIter.next();
561:
562: txnStatements.put(st, st);
563:
564: TxnStatus txnStatus = st.getTxnStatus();
565:
566: if (txnStatus == TxnStatus.NEUTRAL
567: && !st.isExplicit() && explicit) {
568: // Implicit statement is now added explicitly
569: st.setTxnStatus(TxnStatus.EXPLICIT);
570: } else if (txnStatus == TxnStatus.NEW
571: && !st.isExplicit() && explicit) {
572: // Statement was first added implicitly and now
573: // explicitly
574: st.setExplicit(true);
575: } else if (txnStatus == TxnStatus.DEPRECATED) {
576: if (st.isExplicit() == explicit) {
577: // Statement was removed but is now re-added
578: st.setTxnStatus(TxnStatus.NEUTRAL);
579: } else if (explicit) {
580: // Implicit statement was removed but is now added
581: // explicitly
582: st.setTxnStatus(TxnStatus.EXPLICIT);
583: } else {
584: // Explicit statement was removed but can still be
585: // inferred
586: st.setTxnStatus(TxnStatus.INFERRED);
587: }
588:
589: return st;
590: } else if (txnStatus == TxnStatus.INFERRED
591: && st.isExplicit() && explicit) {
592: // Explicit statement was removed but is now re-added
593: st.setTxnStatus(TxnStatus.NEUTRAL);
594: } else if (txnStatus == TxnStatus.ZOMBIE) {
595: // Restore zombie statement
596: st.setTxnStatus(TxnStatus.NEW);
597: st.setExplicit(explicit);
598:
599: return st;
600: }
601:
602: return null;
603: }
604: } finally {
605: stIter.close();
606: }
607: }
608:
609: // completely new statement
610: MemStatement st = new MemStatement(memSubj, memPred, memObj,
611: memContext, explicit, currentSnapshot + 1,
612: TxnStatus.NEW);
613: statements.add(st);
614: st.addToComponentLists();
615:
616: txnStatements.put(st, st);
617:
618: return st;
619: }
620:
621: protected boolean removeStatement(MemStatement st, boolean explicit)
622: throws SailException {
623: boolean statementsRemoved = false;
624: TxnStatus txnStatus = st.getTxnStatus();
625:
626: if (txnStatus == TxnStatus.NEUTRAL
627: && st.isExplicit() == explicit) {
628: // Remove explicit statement
629: st.setTxnStatus(TxnStatus.DEPRECATED);
630: statementsRemoved = true;
631: } else if (txnStatus == TxnStatus.NEW
632: && st.isExplicit() == explicit) {
633: // Statement was added and now removed in the same transaction
634: st.setTxnStatus(TxnStatus.ZOMBIE);
635: statementsRemoved = true;
636: } else if (txnStatus == TxnStatus.INFERRED && st.isExplicit()
637: && !explicit) {
638: // Explicit statement was replaced by inferred statement and this
639: // inferred statement is now removed
640: st.setTxnStatus(TxnStatus.DEPRECATED);
641: statementsRemoved = true;
642: } else if (txnStatus == TxnStatus.EXPLICIT && !st.isExplicit()
643: && explicit) {
644: // Inferred statement was replaced by explicit statement, but this is
645: // now undone
646: st.setTxnStatus(TxnStatus.NEUTRAL);
647: }
648:
649: txnStatements.put(st, st);
650:
651: return statementsRemoved;
652: }
653:
654: protected void startTransaction() throws SailException {
655: cancelSyncTask();
656:
657: txnStatements = new IdentityHashMap<MemStatement, MemStatement>();
658: }
659:
660: protected void commit() throws SailException {
661: boolean statementsAdded = false;
662: boolean statementsRemoved = false;
663: boolean statementsDeprecated = false;
664:
665: int txnSnapshot = currentSnapshot + 1;
666:
667: for (MemStatement st : txnStatements.keySet()) {
668: TxnStatus txnStatus = st.getTxnStatus();
669:
670: if (txnStatus == TxnStatus.NEUTRAL) {
671: continue;
672: } else if (txnStatus == TxnStatus.NEW) {
673: statementsAdded = true;
674: } else if (txnStatus == TxnStatus.DEPRECATED) {
675: st.setTillSnapshot(txnSnapshot);
676: statementsRemoved = true;
677: } else if (txnStatus == TxnStatus.ZOMBIE) {
678: st.setTillSnapshot(txnSnapshot);
679: statementsDeprecated = true;
680: } else if (txnStatus == TxnStatus.EXPLICIT
681: || txnStatus == TxnStatus.INFERRED) {
682: // Deprecate the existing statement...
683: st.setTillSnapshot(txnSnapshot);
684: statementsDeprecated = true;
685:
686: // ...and add a clone with modified explicit/implicit flag
687: MemStatement explSt = new MemStatement(st.getSubject(),
688: st.getPredicate(), st.getObject(), st
689: .getContext(),
690: txnStatus == TxnStatus.EXPLICIT, txnSnapshot);
691: statements.add(explSt);
692: explSt.addToComponentLists();
693: }
694:
695: st.setTxnStatus(TxnStatus.NEUTRAL);
696: }
697:
698: txnStatements = null;
699:
700: if (statementsAdded || statementsRemoved
701: || statementsDeprecated) {
702: currentSnapshot = txnSnapshot;
703: }
704:
705: if (statementsAdded || statementsRemoved) {
706: contentsChanged = true;
707: scheduleSyncTask();
708:
709: DefaultSailChangedEvent event = new DefaultSailChangedEvent(
710: this );
711: event.setStatementsAdded(statementsAdded);
712: event.setStatementsRemoved(statementsRemoved);
713: notifySailChanged(event);
714: }
715:
716: if (statementsDeprecated) {
717: scheduleSnapshotCleanup();
718: }
719: }
720:
721: protected void rollback() throws SailException {
722: logger.debug("rolling back transaction");
723:
724: int txnSnapshot = currentSnapshot + 1;
725:
726: for (MemStatement st : txnStatements.keySet()) {
727: TxnStatus txnStatus = st.getTxnStatus();
728: if (txnStatus == TxnStatus.NEW
729: || txnStatus == TxnStatus.ZOMBIE) {
730: // Statement has been added during this transaction
731: st.setTillSnapshot(txnSnapshot);
732: } else if (txnStatus != TxnStatus.NEUTRAL) {
733: // Return statement to neutral status
734: st.setTxnStatus(TxnStatus.NEUTRAL);
735: }
736: }
737:
738: txnStatements = null;
739:
740: scheduleSnapshotCleanup();
741: }
742:
743: protected void scheduleSyncTask() throws SailException {
744: if (!persist) {
745: return;
746: }
747:
748: if (syncDelay == 0L) {
749: // Sync immediately
750: sync();
751: } else if (syncDelay > 0L) {
752: synchronized (syncTimerSemaphore) {
753: // Sync in syncDelay milliseconds
754: if (syncTimer == null) {
755: // Create the syncTimer on a deamon thread
756: syncTimer = new Timer(
757: "MemoryStore synchronization", true);
758: }
759:
760: if (syncTimerTask != null) {
761: logger.error("syncTimerTask is not null");
762: }
763:
764: syncTimerTask = new TimerTask() {
765:
766: @Override
767: public void run() {
768: try {
769: Lock stLock = getStatementsReadLock();
770: try {
771: sync();
772: } finally {
773: stLock.release();
774: }
775: } catch (SailException e) {
776: logger.warn("Unable to sync on timer", e);
777: }
778: }
779: };
780:
781: syncTimer.schedule(syncTimerTask, syncDelay);
782: }
783: }
784: }
785:
786: protected void cancelSyncTask() {
787: synchronized (syncTimerSemaphore) {
788: if (syncTimerTask != null) {
789: syncTimerTask.cancel();
790: syncTimerTask = null;
791: }
792: }
793: }
794:
795: protected void cancelSyncTimer() {
796: synchronized (syncTimerSemaphore) {
797: if (syncTimer != null) {
798: syncTimer.cancel();
799: syncTimer = null;
800: }
801: }
802: }
803:
804: /**
805: * Synchronizes the contents of this repository with the data that is stored
806: * on disk. Data will only be written when the contents of the repository and
807: * data in the file are out of sync.
808: */
809: public void sync() throws SailException {
810: synchronized (syncSemaphore) {
811: if (persist && contentsChanged) {
812: logger.debug("syncing data to file...");
813: try {
814: FileIO.write(this , dataFile);
815: contentsChanged = false;
816: logger.debug("Data synced to file");
817: } catch (IOException e) {
818: logger.error("Failed to sync to file", e);
819: throw new SailException(e);
820: }
821: }
822: }
823: }
824:
825: /**
826: * Removes statements from old snapshots from the main statement list and
827: * resets the snapshot to 1 for the rest of the statements.
828: *
829: * @throws InterruptedException
830: */
831: protected void cleanSnapshots() throws InterruptedException {
832: MemStatementList statements = this .statements;
833:
834: if (statements == null) {
835: // Store has been shut down
836: return;
837: }
838:
839: Lock stLock = statementListLockManager.getWriteLock();
840: try {
841: for (int i = statements.size() - 1; i >= 0; i--) {
842: MemStatement st = statements.get(i);
843:
844: if (st.getTillSnapshot() <= currentSnapshot) {
845: // stale statement
846: st.removeFromComponentLists();
847: statements.remove(i);
848: } else {
849: // Reset snapshot
850: st.setSinceSnapshot(1);
851: }
852: }
853:
854: currentSnapshot = 1;
855: } finally {
856: stLock.release();
857: }
858: }
859:
860: protected void scheduleSnapshotCleanup() {
861: synchronized (snapshotCleanupThreadSemaphore) {
862: if (snapshotCleanupThread == null
863: || !snapshotCleanupThread.isAlive()) {
864: Runnable runnable = new Runnable() {
865:
866: public void run() {
867: try {
868: cleanSnapshots();
869: } catch (InterruptedException e) {
870: logger.warn("snapshot cleanup interrupted");
871: }
872: }
873: };
874:
875: snapshotCleanupThread = new Thread(runnable,
876: "MemoryStore snapshot cleanup");
877: snapshotCleanupThread.setDaemon(true);
878: snapshotCleanupThread.start();
879: }
880: }
881: }
882: }
|