Source Code Cross Referenced for MemoryStore.java in  » RSS-RDF » sesame » org » openrdf » sail » memory » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » RSS RDF » sesame » org.openrdf.sail.memory 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright Aduna (http://www.aduna-software.com/) (c) 1997-2007.
003:         *
004:         * Licensed under the Aduna BSD-style license.
005:         */
006:        package org.openrdf.sail.memory;
007:
008:        import java.io.File;
009:        import java.io.IOException;
010:        import java.util.IdentityHashMap;
011:        import java.util.LinkedHashSet;
012:        import java.util.Set;
013:        import java.util.Timer;
014:        import java.util.TimerTask;
015:
016:        import info.aduna.concurrent.locks.ExclusiveLockManager;
017:        import info.aduna.concurrent.locks.Lock;
018:        import info.aduna.concurrent.locks.ReadPrefReadWriteLockManager;
019:        import info.aduna.concurrent.locks.ReadWriteLockManager;
020:        import info.aduna.iteration.CloseableIteration;
021:        import info.aduna.iteration.EmptyIteration;
022:
023:        import org.openrdf.model.Resource;
024:        import org.openrdf.model.Statement;
025:        import org.openrdf.model.URI;
026:        import org.openrdf.model.Value;
027:        import org.openrdf.sail.SailConnection;
028:        import org.openrdf.sail.SailException;
029:        import org.openrdf.sail.helpers.DefaultSailChangedEvent;
030:        import org.openrdf.sail.helpers.SailBase;
031:        import org.openrdf.sail.memory.model.MemResource;
032:        import org.openrdf.sail.memory.model.MemStatement;
033:        import org.openrdf.sail.memory.model.MemStatementIterator;
034:        import org.openrdf.sail.memory.model.MemStatementList;
035:        import org.openrdf.sail.memory.model.MemURI;
036:        import org.openrdf.sail.memory.model.MemValue;
037:        import org.openrdf.sail.memory.model.MemValueFactory;
038:        import org.openrdf.sail.memory.model.ReadMode;
039:        import org.openrdf.sail.memory.model.TxnStatus;
040:
041:        /**
042:         * An implementation of the Sail interface that stores its data in main memory
043:         * and that can use a file for persistent storage. This Sail implementation
044:         * supports single, isolated transactions. This means that changes to the data
045:         * are not visible until a transaction is committed and that concurrent
046:         * transactions are not possible. When another transaction is active, calls to
047:         * <tt>startTransaction()</tt> will block until the active transaction is
048:         * committed or rolled back.
049:         * 
050:         * @author Arjohn Kampman
051:         * @author jeen
052:         */
053:        public class MemoryStore extends SailBase {
054:
055:            /*-----------*
056:             * Constants *
057:             *-----------*/
058:
059:            protected static final String DATA_FILE_NAME = "memorystore.data";
060:
061:            /*-----------*
062:             * Variables *
063:             *-----------*/
064:
065:            /**
066:             * Factory/cache for MemValue objects.
067:             */
068:            private MemValueFactory valueFactory;
069:
070:            /**
071:             * List containing all available statements.
072:             */
073:            private MemStatementList statements;
074:
075:            /**
076:             * Set of all statements that have been affected by a transaction.
077:             */
078:            private IdentityHashMap<MemStatement, MemStatement> txnStatements;
079:
080:            /**
081:             * Identifies the current snapshot.
082:             */
083:            private int currentSnapshot;
084:
085:            /**
086:             * Store for namespace prefix info.
087:             */
088:            private MemNamespaceStore namespaceStore;
089:
090:            /**
091:             * Lock manager used to give the snapshot cleanup thread exclusive access to
092:             * the statement list.
093:             */
094:            private ReadWriteLockManager statementListLockManager;
095:
096:            /**
097:             * Lock manager used to prevent concurrent transactions.
098:             */
099:            private ExclusiveLockManager txnLockManager;
100:
101:            /**
102:             * Flag indicating whether the Sail has been initialized.
103:             */
104:            private boolean initialized = false;
105:
106:            private boolean persist = false;
107:
108:            /**
109:             * The file used for data persistence, null if this is a volatile RDF store.
110:             */
111:            private File dataFile;
112:
113:            /**
114:             * Flag indicating whether the contents of this repository have changed.
115:             */
116:            private boolean contentsChanged;
117:
118:            /**
119:             * The sync delay.
120:             * 
121:             * @see #setSyncDelay
122:             */
123:            private long syncDelay = 0L;
124:
125:            /**
126:             * Semaphore used to synchronize concurrent access to {@link #sync()}.
127:             */
128:            private final Object syncSemaphore = new Object();
129:
130:            /**
131:             * The timer used to trigger file synchronization.
132:             */
133:            private Timer syncTimer;
134:
135:            /**
136:             * The currently scheduled timer task, if any.
137:             */
138:            private TimerTask syncTimerTask;
139:
140:            /**
141:             * Semaphore used to synchronize concurrent access to {@link #syncTimer} and
142:             * {@link #syncTimerTask}.
143:             */
144:            private final Object syncTimerSemaphore = new Object();
145:
146:            /**
147:             * Cleanup thread that removes deprecated statements when no other threads
148:             * are accessing this list. Seee {@link #scheduleSnapshotCleanup()}.
149:             */
150:            private Thread snapshotCleanupThread;
151:
152:            /**
153:             * Semaphore used to synchronize concurrent access to
154:             * {@link #snapshotCleanupThread}.
155:             */
156:            private final Object snapshotCleanupThreadSemaphore = new Object();
157:
158:            private boolean trackLocks = false;
159:
160:            /*--------------*
161:             * Constructors *
162:             *--------------*/
163:
164:            /**
165:             * Creates a new MemoryStore.
166:             */
167:            public MemoryStore() {
168:            }
169:
170:            /**
171:             * Creates a new persistent MemoryStore. If the specified data directory
172:             * contains an existing store, its contents will be restored upon
173:             * initialization.
174:             * 
175:             * @param dataDir
176:             *        the data directory to be used for persistence.
177:             */
178:            public MemoryStore(File dataDir) {
179:                setDataDir(dataDir);
180:                setPersist(true);
181:            }
182:
183:            /*---------*
184:             * Methods *
185:             *---------*/
186:
187:            @Override
188:            public void setDataDir(File dataDir) {
189:                if (isInitialized()) {
190:                    throw new IllegalStateException(
191:                            "sail has already been initialized");
192:                }
193:
194:                super .setDataDir(dataDir);
195:            }
196:
197:            public void setPersist(boolean persist) {
198:                if (isInitialized()) {
199:                    throw new IllegalStateException(
200:                            "sail has already been initialized");
201:                }
202:
203:                this .persist = persist;
204:            }
205:
206:            public boolean getPersist() {
207:                return persist;
208:            }
209:
210:            /**
211:             * Sets the time (in milliseconds) to wait after a transaction was commited
212:             * before writing the changed data to file. Setting this variable to 0 will
213:             * force a file sync immediately after each commit. A negative value will
214:             * deactivate file synchronization until the Sail is shut down. A positive
215:             * value will postpone the synchronization for at least that amount of
216:             * milliseconds. If in the meantime a new transaction is started, the file
217:             * synchronization will be rescheduled to wait for another <tt>syncDelay</tt>
218:             * ms. This way, bursts of transaction events can be combined in one file
219:             * sync.
220:             * <p>
221:             * The default value for this parameter is <tt>0</tt> (immediate
222:             * synchronization).
223:             * 
224:             * @param syncDelay
225:             *        The sync delay in milliseconds.
226:             */
227:            public void setSyncDelay(long syncDelay) {
228:                if (isInitialized()) {
229:                    throw new IllegalStateException(
230:                            "sail has already been initialized");
231:                }
232:
233:                this .syncDelay = syncDelay;
234:            }
235:
236:            /**
237:             * Gets the currently configured sync delay.
238:             * 
239:             * @return syncDelay The sync delay in milliseconds.
240:             * @see #setSyncDelay
241:             */
242:            public long getSyncDelay() {
243:                return syncDelay;
244:            }
245:
246:            /**
247:             * Initializes this repository. If a persistence file is defined for the
248:             * store, the contents will be restored.
249:             * 
250:             * @throws SailException
251:             *         when initialization of the store failed.
252:             */
253:            public void initialize() throws SailException {
254:                if (isInitialized()) {
255:                    throw new IllegalStateException(
256:                            "sail has already been intialized");
257:                }
258:
259:                logger.debug("Initializing MemoryStore...");
260:
261:                statementListLockManager = new ReadPrefReadWriteLockManager(
262:                        trackLocks);
263:                txnLockManager = new ExclusiveLockManager(trackLocks);
264:                namespaceStore = new MemNamespaceStore();
265:
266:                valueFactory = new MemValueFactory();
267:                statements = new MemStatementList(256);
268:                currentSnapshot = 1;
269:
270:                if (persist) {
271:                    dataFile = new File(getDataDir(), DATA_FILE_NAME);
272:
273:                    if (dataFile.exists()) {
274:                        logger.debug("Reading data from {}...", dataFile);
275:
276:                        // Initialize persistent store from file
277:                        if (!dataFile.canRead()) {
278:                            logger.error("Data file is not readable: {}",
279:                                    dataFile);
280:                            throw new SailException("Can't read data file: "
281:                                    + dataFile);
282:                        }
283:                        // Don't try to read empty files: this will result in an
284:                        // IOException, and the file doesn't contain any data anyway.
285:                        if (dataFile.length() == 0L) {
286:                            logger.warn("Ignoring empty data file: {}",
287:                                    dataFile);
288:                        } else {
289:                            try {
290:                                FileIO.read(this , dataFile);
291:                                logger.debug("Data file read successfully");
292:                            } catch (IOException e) {
293:                                logger.error("Failed to read data file", e);
294:                                throw new SailException(e);
295:                            }
296:                        }
297:                    } else {
298:                        // file specified that does not exist yet, create it
299:                        try {
300:                            File dir = dataFile.getParentFile();
301:                            if (dir != null && !dir.exists()) {
302:                                logger
303:                                        .debug("Creating directory for data file...");
304:                                if (!dir.mkdirs()) {
305:                                    logger
306:                                            .debug(
307:                                                    "Failed to create directory for data file: {}",
308:                                                    dir);
309:                                    throw new SailException(
310:                                            "Failed to create directory for data file: "
311:                                                    + dir);
312:                                }
313:                            }
314:
315:                            logger.debug("Initializing data file...");
316:                            FileIO.write(this , dataFile);
317:                            logger.debug("Data file initialized");
318:                        } catch (IOException e) {
319:                            logger.debug("Failed to initialize data file", e);
320:                            throw new SailException(
321:                                    "Failed to initialize data file "
322:                                            + dataFile, e);
323:                        } catch (SailException e) {
324:                            logger.debug("Failed to initialize data file", e);
325:                            throw new SailException(
326:                                    "Failed to initialize data file "
327:                                            + dataFile, e);
328:                        }
329:                    }
330:                }
331:
332:                contentsChanged = false;
333:                initialized = true;
334:
335:                logger.debug("MemoryStore initialized");
336:            }
337:
338:            /**
339:             * Checks whether the Sail has been initialized.
340:             * 
341:             * @return <tt>true</tt> if the Sail has been initialized, <tt>false</tt>
342:             *         otherwise.
343:             */
344:            protected final boolean isInitialized() {
345:                return initialized;
346:            }
347:
348:            @Override
349:            protected void shutDownInternal() throws SailException {
350:                if (isInitialized()) {
351:                    Lock stLock = getStatementsReadLock();
352:
353:                    try {
354:                        cancelSyncTimer();
355:                        sync();
356:
357:                        valueFactory = null;
358:                        statements = null;
359:                        dataFile = null;
360:                        initialized = false;
361:                    } finally {
362:                        stLock.release();
363:                    }
364:                }
365:            }
366:
367:            /**
368:             * Checks whether this Sail object is writable. A MemoryStore is not writable
369:             * if a read-only data file is used.
370:             */
371:            public boolean isWritable() {
372:                // Sail is not writable when it has a data file that is not writable
373:                return dataFile == null || dataFile.canWrite();
374:            }
375:
376:            @Override
377:            protected SailConnection getConnectionInternal()
378:                    throws SailException {
379:                if (!isInitialized()) {
380:                    throw new IllegalStateException("sail not initialized.");
381:                }
382:
383:                return new MemoryStoreConnection(this );
384:            }
385:
386:            public MemValueFactory getValueFactory() {
387:                if (valueFactory == null) {
388:                    throw new IllegalStateException("sail not initialized.");
389:                }
390:
391:                return valueFactory;
392:            }
393:
394:            protected MemNamespaceStore getNamespaceStore() {
395:                return namespaceStore;
396:            }
397:
398:            protected MemStatementList getStatements() {
399:                return statements;
400:            }
401:
402:            protected int getCurrentSnapshot() {
403:                return currentSnapshot;
404:            }
405:
406:            protected Lock getStatementsReadLock() throws SailException {
407:                try {
408:                    return statementListLockManager.getReadLock();
409:                } catch (InterruptedException e) {
410:                    throw new SailException(e);
411:                }
412:            }
413:
414:            protected Lock getTransactionLock() throws SailException {
415:                try {
416:                    return txnLockManager.getExclusiveLock();
417:                } catch (InterruptedException e) {
418:                    throw new SailException(e);
419:                }
420:            }
421:
422:            protected int size() {
423:                return statements.size();
424:            }
425:
426:            /**
427:             * Creates a StatementIterator that contains the statements matching the
428:             * specified pattern of subject, predicate, object, context. Inferred
429:             * statements are excluded when <tt>explicitOnly</tt> is set to
430:             * <tt>true</tt>. Statements from the null context are excluded when
431:             * <tt>namedContextsOnly</tt> is set to <tt>true</tt>. The returned
432:             * StatementIterator will assume the specified read mode.
433:             */
434:            protected <X extends Exception> CloseableIteration<MemStatement, X> createStatementIterator(
435:                    Class<X> excClass, Resource subj, URI pred, Value obj,
436:                    boolean explicitOnly, int snapshot, ReadMode readMode,
437:                    Resource... contexts) {
438:                // Perform look-ups for value-equivalents of the specified values
439:                MemResource memSubj = valueFactory.getMemResource(subj);
440:                if (subj != null && memSubj == null) {
441:                    // non-existent subject
442:                    return new EmptyIteration<MemStatement, X>();
443:                }
444:
445:                MemURI memPred = valueFactory.getMemURI(pred);
446:                if (pred != null && memPred == null) {
447:                    // non-existent predicate
448:                    return new EmptyIteration<MemStatement, X>();
449:                }
450:
451:                MemValue memObj = valueFactory.getMemValue(obj);
452:                if (obj != null && memObj == null) {
453:                    // non-existent object
454:                    return new EmptyIteration<MemStatement, X>();
455:                }
456:
457:                MemResource[] memContexts;
458:                MemStatementList smallestList;
459:
460:                if (contexts.length == 0) {
461:                    memContexts = new MemResource[0];
462:                    smallestList = statements;
463:                } else if (contexts.length == 1 && contexts[0] != null) {
464:                    MemResource memContext = valueFactory
465:                            .getMemResource(contexts[0]);
466:                    if (memContext == null) {
467:                        // non-existent context
468:                        return new EmptyIteration<MemStatement, X>();
469:                    }
470:
471:                    memContexts = new MemResource[] { memContext };
472:                    smallestList = memContext.getContextStatementList();
473:                } else {
474:                    Set<MemResource> contextSet = new LinkedHashSet<MemResource>(
475:                            2 * contexts.length);
476:
477:                    for (Resource context : contexts) {
478:                        MemResource memContext = valueFactory
479:                                .getMemResource(context);
480:                        if (context == null || memContext != null) {
481:                            contextSet.add(memContext);
482:                        }
483:                    }
484:
485:                    if (contextSet.isEmpty()) {
486:                        // no known contexts specified
487:                        return new EmptyIteration<MemStatement, X>();
488:                    }
489:
490:                    memContexts = contextSet.toArray(new MemResource[contextSet
491:                            .size()]);
492:                    smallestList = statements;
493:                }
494:
495:                if (memSubj != null) {
496:                    MemStatementList l = memSubj.getSubjectStatementList();
497:                    if (l.size() < smallestList.size()) {
498:                        smallestList = l;
499:                    }
500:                }
501:
502:                if (memPred != null) {
503:                    MemStatementList l = memPred.getPredicateStatementList();
504:                    if (l.size() < smallestList.size()) {
505:                        smallestList = l;
506:                    }
507:                }
508:
509:                if (memObj != null) {
510:                    MemStatementList l = memObj.getObjectStatementList();
511:                    if (l.size() < smallestList.size()) {
512:                        smallestList = l;
513:                    }
514:                }
515:
516:                return new MemStatementIterator<X>(smallestList, memSubj,
517:                        memPred, memObj, explicitOnly, snapshot, readMode,
518:                        memContexts);
519:            }
520:
521:            protected Statement addStatement(Resource subj, URI pred,
522:                    Value obj, Resource context, boolean explicit)
523:                    throws SailException {
524:                boolean newValueCreated = false;
525:
526:                // Get or create MemValues for the operands
527:                MemResource memSubj = valueFactory.getMemResource(subj);
528:                if (memSubj == null) {
529:                    memSubj = valueFactory.createMemResource(subj);
530:                    newValueCreated = true;
531:                }
532:                MemURI memPred = valueFactory.getMemURI(pred);
533:                if (memPred == null) {
534:                    memPred = valueFactory.createMemURI(pred);
535:                    newValueCreated = true;
536:                }
537:                MemValue memObj = valueFactory.getMemValue(obj);
538:                if (memObj == null) {
539:                    memObj = valueFactory.createMemValue(obj);
540:                    newValueCreated = true;
541:                }
542:                MemResource memContext = valueFactory.getMemResource(context);
543:                if (context != null && memContext == null) {
544:                    memContext = valueFactory.createMemResource(context);
545:                    newValueCreated = true;
546:                }
547:
548:                if (!newValueCreated) {
549:                    // All values were already present in the graph. Possibly, the
550:                    // statement is already present. Check this.
551:                    CloseableIteration<MemStatement, SailException> stIter = createStatementIterator(
552:                            SailException.class, memSubj, memPred, memObj,
553:                            false, currentSnapshot + 1, ReadMode.RAW,
554:                            memContext);
555:
556:                    try {
557:                        if (stIter.hasNext()) {
558:                            // statement is already present, update its transaction
559:                            // status if appropriate
560:                            MemStatement st = stIter.next();
561:
562:                            txnStatements.put(st, st);
563:
564:                            TxnStatus txnStatus = st.getTxnStatus();
565:
566:                            if (txnStatus == TxnStatus.NEUTRAL
567:                                    && !st.isExplicit() && explicit) {
568:                                // Implicit statement is now added explicitly
569:                                st.setTxnStatus(TxnStatus.EXPLICIT);
570:                            } else if (txnStatus == TxnStatus.NEW
571:                                    && !st.isExplicit() && explicit) {
572:                                // Statement was first added implicitly and now
573:                                // explicitly
574:                                st.setExplicit(true);
575:                            } else if (txnStatus == TxnStatus.DEPRECATED) {
576:                                if (st.isExplicit() == explicit) {
577:                                    // Statement was removed but is now re-added
578:                                    st.setTxnStatus(TxnStatus.NEUTRAL);
579:                                } else if (explicit) {
580:                                    // Implicit statement was removed but is now added
581:                                    // explicitly
582:                                    st.setTxnStatus(TxnStatus.EXPLICIT);
583:                                } else {
584:                                    // Explicit statement was removed but can still be
585:                                    // inferred
586:                                    st.setTxnStatus(TxnStatus.INFERRED);
587:                                }
588:
589:                                return st;
590:                            } else if (txnStatus == TxnStatus.INFERRED
591:                                    && st.isExplicit() && explicit) {
592:                                // Explicit statement was removed but is now re-added
593:                                st.setTxnStatus(TxnStatus.NEUTRAL);
594:                            } else if (txnStatus == TxnStatus.ZOMBIE) {
595:                                // Restore zombie statement
596:                                st.setTxnStatus(TxnStatus.NEW);
597:                                st.setExplicit(explicit);
598:
599:                                return st;
600:                            }
601:
602:                            return null;
603:                        }
604:                    } finally {
605:                        stIter.close();
606:                    }
607:                }
608:
609:                // completely new statement
610:                MemStatement st = new MemStatement(memSubj, memPred, memObj,
611:                        memContext, explicit, currentSnapshot + 1,
612:                        TxnStatus.NEW);
613:                statements.add(st);
614:                st.addToComponentLists();
615:
616:                txnStatements.put(st, st);
617:
618:                return st;
619:            }
620:
621:            protected boolean removeStatement(MemStatement st, boolean explicit)
622:                    throws SailException {
623:                boolean statementsRemoved = false;
624:                TxnStatus txnStatus = st.getTxnStatus();
625:
626:                if (txnStatus == TxnStatus.NEUTRAL
627:                        && st.isExplicit() == explicit) {
628:                    // Remove explicit statement
629:                    st.setTxnStatus(TxnStatus.DEPRECATED);
630:                    statementsRemoved = true;
631:                } else if (txnStatus == TxnStatus.NEW
632:                        && st.isExplicit() == explicit) {
633:                    // Statement was added and now removed in the same transaction
634:                    st.setTxnStatus(TxnStatus.ZOMBIE);
635:                    statementsRemoved = true;
636:                } else if (txnStatus == TxnStatus.INFERRED && st.isExplicit()
637:                        && !explicit) {
638:                    // Explicit statement was replaced by inferred statement and this
639:                    // inferred statement is now removed
640:                    st.setTxnStatus(TxnStatus.DEPRECATED);
641:                    statementsRemoved = true;
642:                } else if (txnStatus == TxnStatus.EXPLICIT && !st.isExplicit()
643:                        && explicit) {
644:                    // Inferred statement was replaced by explicit statement, but this is
645:                    // now undone
646:                    st.setTxnStatus(TxnStatus.NEUTRAL);
647:                }
648:
649:                txnStatements.put(st, st);
650:
651:                return statementsRemoved;
652:            }
653:
654:            protected void startTransaction() throws SailException {
655:                cancelSyncTask();
656:
657:                txnStatements = new IdentityHashMap<MemStatement, MemStatement>();
658:            }
659:
660:            protected void commit() throws SailException {
661:                boolean statementsAdded = false;
662:                boolean statementsRemoved = false;
663:                boolean statementsDeprecated = false;
664:
665:                int txnSnapshot = currentSnapshot + 1;
666:
667:                for (MemStatement st : txnStatements.keySet()) {
668:                    TxnStatus txnStatus = st.getTxnStatus();
669:
670:                    if (txnStatus == TxnStatus.NEUTRAL) {
671:                        continue;
672:                    } else if (txnStatus == TxnStatus.NEW) {
673:                        statementsAdded = true;
674:                    } else if (txnStatus == TxnStatus.DEPRECATED) {
675:                        st.setTillSnapshot(txnSnapshot);
676:                        statementsRemoved = true;
677:                    } else if (txnStatus == TxnStatus.ZOMBIE) {
678:                        st.setTillSnapshot(txnSnapshot);
679:                        statementsDeprecated = true;
680:                    } else if (txnStatus == TxnStatus.EXPLICIT
681:                            || txnStatus == TxnStatus.INFERRED) {
682:                        // Deprecate the existing statement...
683:                        st.setTillSnapshot(txnSnapshot);
684:                        statementsDeprecated = true;
685:
686:                        // ...and add a clone with modified explicit/implicit flag
687:                        MemStatement explSt = new MemStatement(st.getSubject(),
688:                                st.getPredicate(), st.getObject(), st
689:                                        .getContext(),
690:                                txnStatus == TxnStatus.EXPLICIT, txnSnapshot);
691:                        statements.add(explSt);
692:                        explSt.addToComponentLists();
693:                    }
694:
695:                    st.setTxnStatus(TxnStatus.NEUTRAL);
696:                }
697:
698:                txnStatements = null;
699:
700:                if (statementsAdded || statementsRemoved
701:                        || statementsDeprecated) {
702:                    currentSnapshot = txnSnapshot;
703:                }
704:
705:                if (statementsAdded || statementsRemoved) {
706:                    contentsChanged = true;
707:                    scheduleSyncTask();
708:
709:                    DefaultSailChangedEvent event = new DefaultSailChangedEvent(
710:                            this );
711:                    event.setStatementsAdded(statementsAdded);
712:                    event.setStatementsRemoved(statementsRemoved);
713:                    notifySailChanged(event);
714:                }
715:
716:                if (statementsDeprecated) {
717:                    scheduleSnapshotCleanup();
718:                }
719:            }
720:
721:            protected void rollback() throws SailException {
722:                logger.debug("rolling back transaction");
723:
724:                int txnSnapshot = currentSnapshot + 1;
725:
726:                for (MemStatement st : txnStatements.keySet()) {
727:                    TxnStatus txnStatus = st.getTxnStatus();
728:                    if (txnStatus == TxnStatus.NEW
729:                            || txnStatus == TxnStatus.ZOMBIE) {
730:                        // Statement has been added during this transaction
731:                        st.setTillSnapshot(txnSnapshot);
732:                    } else if (txnStatus != TxnStatus.NEUTRAL) {
733:                        // Return statement to neutral status
734:                        st.setTxnStatus(TxnStatus.NEUTRAL);
735:                    }
736:                }
737:
738:                txnStatements = null;
739:
740:                scheduleSnapshotCleanup();
741:            }
742:
743:            protected void scheduleSyncTask() throws SailException {
744:                if (!persist) {
745:                    return;
746:                }
747:
748:                if (syncDelay == 0L) {
749:                    // Sync immediately
750:                    sync();
751:                } else if (syncDelay > 0L) {
752:                    synchronized (syncTimerSemaphore) {
753:                        // Sync in syncDelay milliseconds
754:                        if (syncTimer == null) {
755:                            // Create the syncTimer on a deamon thread
756:                            syncTimer = new Timer(
757:                                    "MemoryStore synchronization", true);
758:                        }
759:
760:                        if (syncTimerTask != null) {
761:                            logger.error("syncTimerTask is not null");
762:                        }
763:
764:                        syncTimerTask = new TimerTask() {
765:
766:                            @Override
767:                            public void run() {
768:                                try {
769:                                    Lock stLock = getStatementsReadLock();
770:                                    try {
771:                                        sync();
772:                                    } finally {
773:                                        stLock.release();
774:                                    }
775:                                } catch (SailException e) {
776:                                    logger.warn("Unable to sync on timer", e);
777:                                }
778:                            }
779:                        };
780:
781:                        syncTimer.schedule(syncTimerTask, syncDelay);
782:                    }
783:                }
784:            }
785:
786:            protected void cancelSyncTask() {
787:                synchronized (syncTimerSemaphore) {
788:                    if (syncTimerTask != null) {
789:                        syncTimerTask.cancel();
790:                        syncTimerTask = null;
791:                    }
792:                }
793:            }
794:
795:            protected void cancelSyncTimer() {
796:                synchronized (syncTimerSemaphore) {
797:                    if (syncTimer != null) {
798:                        syncTimer.cancel();
799:                        syncTimer = null;
800:                    }
801:                }
802:            }
803:
804:            /**
805:             * Synchronizes the contents of this repository with the data that is stored
806:             * on disk. Data will only be written when the contents of the repository and
807:             * data in the file are out of sync.
808:             */
809:            public void sync() throws SailException {
810:                synchronized (syncSemaphore) {
811:                    if (persist && contentsChanged) {
812:                        logger.debug("syncing data to file...");
813:                        try {
814:                            FileIO.write(this , dataFile);
815:                            contentsChanged = false;
816:                            logger.debug("Data synced to file");
817:                        } catch (IOException e) {
818:                            logger.error("Failed to sync to file", e);
819:                            throw new SailException(e);
820:                        }
821:                    }
822:                }
823:            }
824:
825:            /**
826:             * Removes statements from old snapshots from the main statement list and
827:             * resets the snapshot to 1 for the rest of the statements.
828:             * 
829:             * @throws InterruptedException
830:             */
831:            protected void cleanSnapshots() throws InterruptedException {
832:                MemStatementList statements = this .statements;
833:
834:                if (statements == null) {
835:                    // Store has been shut down
836:                    return;
837:                }
838:
839:                Lock stLock = statementListLockManager.getWriteLock();
840:                try {
841:                    for (int i = statements.size() - 1; i >= 0; i--) {
842:                        MemStatement st = statements.get(i);
843:
844:                        if (st.getTillSnapshot() <= currentSnapshot) {
845:                            // stale statement
846:                            st.removeFromComponentLists();
847:                            statements.remove(i);
848:                        } else {
849:                            // Reset snapshot
850:                            st.setSinceSnapshot(1);
851:                        }
852:                    }
853:
854:                    currentSnapshot = 1;
855:                } finally {
856:                    stLock.release();
857:                }
858:            }
859:
860:            protected void scheduleSnapshotCleanup() {
861:                synchronized (snapshotCleanupThreadSemaphore) {
862:                    if (snapshotCleanupThread == null
863:                            || !snapshotCleanupThread.isAlive()) {
864:                        Runnable runnable = new Runnable() {
865:
866:                            public void run() {
867:                                try {
868:                                    cleanSnapshots();
869:                                } catch (InterruptedException e) {
870:                                    logger.warn("snapshot cleanup interrupted");
871:                                }
872:                            }
873:                        };
874:
875:                        snapshotCleanupThread = new Thread(runnable,
876:                                "MemoryStore snapshot cleanup");
877:                        snapshotCleanupThread.setDaemon(true);
878:                        snapshotCleanupThread.start();
879:                    }
880:                }
881:            }
882:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.