001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 1997-2007.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.nativerdf;
007:
008: import java.io.IOException;
009: import java.util.ArrayList;
010: import java.util.Arrays;
011: import java.util.Collections;
012: import java.util.List;
013:
014: import info.aduna.concurrent.locks.Lock;
015: import info.aduna.iteration.CloseableIteration;
016: import info.aduna.iteration.ConvertingIteration;
017: import info.aduna.iteration.DistinctIteration;
018: import info.aduna.iteration.ExceptionConvertingIteration;
019: import info.aduna.iteration.FilterIteration;
020: import info.aduna.iteration.Iterations;
021: import info.aduna.iteration.IteratorIteration;
022: import info.aduna.iteration.LockingIteration;
023:
024: import org.openrdf.OpenRDFUtil;
025: import org.openrdf.model.Namespace;
026: import org.openrdf.model.Resource;
027: import org.openrdf.model.Statement;
028: import org.openrdf.model.URI;
029: import org.openrdf.model.Value;
030: import org.openrdf.model.impl.NamespaceImpl;
031: import org.openrdf.query.BindingSet;
032: import org.openrdf.query.Dataset;
033: import org.openrdf.query.QueryEvaluationException;
034: import org.openrdf.query.algebra.QueryRoot;
035: import org.openrdf.query.algebra.TupleExpr;
036: import org.openrdf.query.algebra.Var;
037: import org.openrdf.query.algebra.evaluation.TripleSource;
038: import org.openrdf.query.algebra.evaluation.impl.BindingAssigner;
039: import org.openrdf.query.algebra.evaluation.impl.CompareOptimizer;
040: import org.openrdf.query.algebra.evaluation.impl.ConjunctiveConstraintSplitter;
041: import org.openrdf.query.algebra.evaluation.impl.ConstantOptimizer;
042: import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
043: import org.openrdf.query.algebra.evaluation.impl.FilterOptimizer;
044: import org.openrdf.query.algebra.evaluation.impl.QueryJoinOptimizer;
045: import org.openrdf.query.algebra.evaluation.impl.QueryModelPruner;
046: import org.openrdf.query.algebra.evaluation.impl.SameTermFilterOptimizer;
047: import org.openrdf.query.algebra.evaluation.util.QueryOptimizerList;
048: import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
049: import org.openrdf.sail.SailException;
050: import org.openrdf.sail.helpers.DefaultSailChangedEvent;
051: import org.openrdf.sail.helpers.SailConnectionBase;
052: import org.openrdf.sail.inferencer.InferencerConnection;
053: import org.openrdf.sail.nativerdf.btree.RecordIterator;
054: import org.openrdf.sail.nativerdf.model.NativeValue;
055:
056: /**
057: * @author Arjohn Kampman
058: */
059: public class NativeStoreConnection extends SailConnectionBase implements
060: InferencerConnection {
061:
062: /*-----------*
063: * Constants *
064: *-----------*/
065:
066: protected final NativeStore nativeStore;
067:
068: /*-----------*
069: * Variables *
070: *-----------*/
071:
072: private DefaultSailChangedEvent sailChangedEvent;
073:
074: /**
075: * The exclusive transaction lock held by this connection during
076: * transactions.
077: */
078: private Lock txnLock;
079:
080: /*--------------*
081: * Constructors *
082: *--------------*/
083:
084: protected NativeStoreConnection(NativeStore nativeStore)
085: throws IOException {
086: super (nativeStore);
087: this .nativeStore = nativeStore;
088: sailChangedEvent = new DefaultSailChangedEvent(nativeStore);
089: }
090:
091: /*---------*
092: * Methods *
093: *---------*/
094:
095: @Override
096: protected void closeInternal() {
097: // FIXME we should check for open iteration objects.
098: }
099:
100: @Override
101: protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(
102: TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
103: boolean includeInferred) throws SailException {
104: // Clone the tuple expression to allow for more aggressive optimizations
105: tupleExpr = tupleExpr.clone();
106:
107: if (!(tupleExpr instanceof QueryRoot)) {
108: // Add a dummy root node to the tuple expressions to allow the
109: // optimizers to modify the actual root node
110: tupleExpr = new QueryRoot(tupleExpr);
111: }
112:
113: Lock readLock = nativeStore.getReadLock();
114:
115: try {
116: replaceValues(tupleExpr);
117:
118: TripleSource tripleSource = new NativeTripleSource(
119: nativeStore, includeInferred, transactionActive());
120: EvaluationStrategyImpl strategy = new EvaluationStrategyImpl(
121: tripleSource, dataset);
122:
123: QueryOptimizerList optimizerList = new QueryOptimizerList();
124: optimizerList.add(new BindingAssigner());
125: optimizerList.add(new ConstantOptimizer(strategy));
126: optimizerList.add(new CompareOptimizer());
127: optimizerList.add(new ConjunctiveConstraintSplitter());
128: optimizerList.add(new SameTermFilterOptimizer());
129: optimizerList.add(new QueryModelPruner());
130: optimizerList.add(new QueryJoinOptimizer());
131: optimizerList.add(new FilterOptimizer());
132:
133: optimizerList.optimize(tupleExpr, dataset, bindings);
134:
135: CloseableIteration<BindingSet, QueryEvaluationException> iter;
136: iter = strategy.evaluate(tupleExpr, bindings);
137: return new LockingIteration<BindingSet, QueryEvaluationException>(
138: readLock, iter);
139: } catch (QueryEvaluationException e) {
140: readLock.release();
141: throw new SailException(e);
142: } catch (RuntimeException e) {
143: readLock.release();
144: throw e;
145: }
146: }
147:
148: protected void replaceValues(TupleExpr tupleExpr)
149: throws SailException {
150: // Replace all Value objects stored in variables with NativeValue objects,
151: // which cache internal IDs
152: tupleExpr.visit(new QueryModelVisitorBase<SailException>() {
153:
154: @Override
155: public void meet(Var var) {
156: if (var.hasValue()) {
157: var.setValue(nativeStore.getValueStore()
158: .getNativeValue(var.getValue()));
159: }
160: }
161: });
162: }
163:
164: @Override
165: protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal()
166: throws SailException {
167: // Which resources are used as context identifiers is not stored
168: // separately. Iterate over all statements and extract their context.
169: Lock readLock = nativeStore.getReadLock();
170: try {
171: // Iterator over all statements
172: CloseableIteration<? extends Statement, IOException> stIter;
173: stIter = nativeStore.createStatementIterator(null, null,
174: null, true, transactionActive());
175:
176: // Filter statements without context resource
177: stIter = new FilterIteration<Statement, IOException>(stIter) {
178:
179: @Override
180: protected boolean accept(Statement st) {
181: return st.getContext() != null;
182: }
183: };
184:
185: // Return the contexts of the statements, filtering any duplicates,
186: // releasing the read lock when the iterator is closed
187: CloseableIteration<? extends Resource, IOException> contextIter;
188: contextIter = new DistinctIteration<Resource, IOException>(
189: new ConvertingIteration<Statement, Resource, IOException>(
190: stIter) {
191:
192: @Override
193: protected Resource convert(Statement st) {
194: return st.getContext();
195: }
196: });
197:
198: contextIter = new LockingIteration<Resource, IOException>(
199: readLock, contextIter);
200:
201: return new ExceptionConvertingIteration<Resource, SailException>(
202: contextIter) {
203:
204: @Override
205: protected SailException convert(Exception e) {
206: if (e instanceof IOException) {
207: return new SailException(e);
208: } else if (e instanceof RuntimeException) {
209: throw (RuntimeException) e;
210: } else if (e == null) {
211: throw new IllegalArgumentException(
212: "e must not be null");
213: } else {
214: throw new IllegalArgumentException(
215: "Unexpected exception type: "
216: + e.getClass());
217: }
218: }
219: };
220: } catch (IOException e) {
221: readLock.release();
222: throw new SailException(e);
223: } catch (RuntimeException e) {
224: readLock.release();
225: throw e;
226: }
227: }
228:
229: @Override
230: protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(
231: Resource subj, URI pred, Value obj,
232: boolean includeInferred, Resource... contexts)
233: throws SailException {
234: Lock readLock = nativeStore.getReadLock();
235: try {
236: CloseableIteration<? extends Statement, IOException> iter;
237: iter = nativeStore.createStatementIterator(subj, pred, obj,
238: includeInferred, transactionActive(), contexts);
239: iter = new LockingIteration<Statement, IOException>(
240: readLock, iter);
241:
242: return new ExceptionConvertingIteration<Statement, SailException>(
243: iter) {
244:
245: @Override
246: protected SailException convert(Exception e) {
247: if (e instanceof IOException) {
248: return new SailException(e);
249: } else if (e instanceof RuntimeException) {
250: throw (RuntimeException) e;
251: } else if (e == null) {
252: throw new IllegalArgumentException(
253: "e must not be null");
254: } else {
255: throw new IllegalArgumentException(
256: "Unexpected exception type: "
257: + e.getClass());
258: }
259: }
260: };
261: } catch (IOException e) {
262: readLock.release();
263: throw new SailException("Unable to get statements", e);
264: } catch (RuntimeException e) {
265: readLock.release();
266: throw e;
267: }
268: }
269:
270: @Override
271: protected long sizeInternal(Resource... contexts)
272: throws SailException {
273: OpenRDFUtil.verifyContextNotNull(contexts);
274:
275: Lock readLock = nativeStore.getReadLock();
276:
277: try {
278: List<Integer> contextIDs;
279: if (contexts.length == 0) {
280: contextIDs = Arrays.asList(NativeValue.UNKNOWN_ID);
281: } else {
282: contextIDs = nativeStore.getContextIDs(contexts);
283: }
284:
285: long size = 0L;
286:
287: for (int contextID : contextIDs) {
288: // Iterate over all explicit statements
289: RecordIterator iter = nativeStore.getTripleStore()
290: .getTriples(-1, -1, -1, contextID, true,
291: transactionActive());
292: try {
293: while (iter.next() != null) {
294: size++;
295: }
296: } finally {
297: iter.close();
298: }
299: }
300:
301: return size;
302: } catch (IOException e) {
303: throw new SailException(e);
304: } finally {
305: readLock.release();
306: }
307: }
308:
309: @Override
310: protected CloseableIteration<? extends Namespace, SailException> getNamespacesInternal()
311: throws SailException {
312: Lock readLock = nativeStore.getReadLock();
313: try {
314: return new LockingIteration<NamespaceImpl, SailException>(
315: readLock,
316: new IteratorIteration<NamespaceImpl, SailException>(
317: nativeStore.getNamespaceStore().iterator()));
318: } catch (RuntimeException e) {
319: readLock.release();
320: throw e;
321: }
322: }
323:
324: @Override
325: protected String getNamespaceInternal(String prefix)
326: throws SailException {
327: Lock readLock = nativeStore.getReadLock();
328: try {
329: return nativeStore.getNamespaceStore().getNamespace(prefix);
330: } finally {
331: readLock.release();
332: }
333: }
334:
335: @Override
336: protected void startTransactionInternal() throws SailException {
337: txnLock = nativeStore.getTransactionLock();
338:
339: try {
340: nativeStore.getTripleStore().startTransaction();
341: } catch (IOException e) {
342: throw new SailException(e);
343: }
344: }
345:
346: @Override
347: protected void commitInternal() throws SailException {
348: Lock storeReadLock = nativeStore.getReadLock();
349:
350: try {
351: nativeStore.getValueStore().sync();
352: nativeStore.getTripleStore().commit();
353: nativeStore.getNamespaceStore().sync();
354:
355: txnLock.release();
356: } catch (IOException e) {
357: throw new SailException(e);
358: } finally {
359: storeReadLock.release();
360: }
361:
362: nativeStore.notifySailChanged(sailChangedEvent);
363:
364: // create a fresh event object.
365: sailChangedEvent = new DefaultSailChangedEvent(nativeStore);
366: }
367:
368: @Override
369: protected void rollbackInternal() throws SailException {
370: Lock storeReadLock = nativeStore.getReadLock();
371:
372: try {
373: nativeStore.getValueStore().sync();
374: nativeStore.getTripleStore().rollback();
375: } catch (IOException e) {
376: throw new SailException(e);
377: } finally {
378: txnLock.release();
379: storeReadLock.release();
380: }
381: }
382:
383: @Override
384: protected void addStatementInternal(Resource subj, URI pred,
385: Value obj, Resource... contexts) throws SailException {
386: addStatement(subj, pred, obj, true, contexts);
387: }
388:
389: public boolean addInferredStatement(Resource subj, URI pred,
390: Value obj, Resource... contexts) throws SailException {
391: Lock conLock = getSharedConnectionLock();
392: try {
393: verifyIsOpen();
394:
395: Lock txnLock = getTransactionLock();
396: try {
397: autoStartTransaction();
398: return addStatement(subj, pred, obj, false, contexts);
399: } finally {
400: txnLock.release();
401: }
402: } finally {
403: conLock.release();
404: }
405: }
406:
407: private boolean addStatement(Resource subj, URI pred, Value obj,
408: boolean explicit, Resource... contexts)
409: throws SailException {
410: OpenRDFUtil.verifyContextNotNull(contexts);
411:
412: boolean result = false;
413:
414: try {
415: ValueStore valueStore = nativeStore.getValueStore();
416: int subjID = valueStore.storeValue(subj);
417: int predID = valueStore.storeValue(pred);
418: int objID = valueStore.storeValue(obj);
419:
420: if (contexts.length == 0) {
421: contexts = new Resource[] { null };
422: }
423:
424: for (Resource context : contexts) {
425: int contextID = 0;
426: if (context != null) {
427: contextID = valueStore.storeValue(context);
428: }
429:
430: boolean wasNew = nativeStore.getTripleStore()
431: .storeTriple(subjID, predID, objID, contextID,
432: explicit);
433: result |= wasNew;
434:
435: if (wasNew) {
436: // The triple was not yet present in the triple store
437: sailChangedEvent.setStatementsAdded(true);
438:
439: if (hasConnectionListeners()) {
440: Statement st;
441:
442: if (context != null) {
443: st = valueStore.createStatement(subj, pred,
444: obj, context);
445: } else {
446: st = valueStore.createStatement(subj, pred,
447: obj);
448: }
449:
450: notifyStatementAdded(st);
451: }
452: }
453: }
454: } catch (IOException e) {
455: throw new SailException(e);
456: }
457:
458: return result;
459: }
460:
461: @Override
462: protected void removeStatementsInternal(Resource subj, URI pred,
463: Value obj, Resource... contexts) throws SailException {
464: removeStatements(subj, pred, obj, true, contexts);
465: }
466:
467: public boolean removeInferredStatement(Resource subj, URI pred,
468: Value obj, Resource... contexts) throws SailException {
469: Lock conLock = getSharedConnectionLock();
470: try {
471: verifyIsOpen();
472:
473: Lock txnLock = getTransactionLock();
474: try {
475: autoStartTransaction();
476: int removeCount = removeStatements(subj, pred, obj,
477: false, contexts);
478: return removeCount > 0;
479: } finally {
480: txnLock.release();
481: }
482: } finally {
483: conLock.release();
484: }
485: }
486:
487: private int removeStatements(Resource subj, URI pred, Value obj,
488: boolean explicit, Resource... contexts)
489: throws SailException {
490: OpenRDFUtil.verifyContextNotNull(contexts);
491:
492: try {
493: TripleStore tripleStore = nativeStore.getTripleStore();
494: ValueStore valueStore = nativeStore.getValueStore();
495:
496: int subjID = NativeValue.UNKNOWN_ID;
497: if (subj != null) {
498: subjID = valueStore.getID(subj);
499: if (subjID == NativeValue.UNKNOWN_ID) {
500: return 0;
501: }
502: }
503: int predID = NativeValue.UNKNOWN_ID;
504: if (pred != null) {
505: predID = valueStore.getID(pred);
506: if (predID == NativeValue.UNKNOWN_ID) {
507: return 0;
508: }
509: }
510: int objID = NativeValue.UNKNOWN_ID;
511: if (obj != null) {
512: objID = valueStore.getID(obj);
513: if (objID == NativeValue.UNKNOWN_ID) {
514: return 0;
515: }
516: }
517:
518: List<Integer> contextIDList = new ArrayList<Integer>(
519: contexts.length);
520: if (contexts.length == 0) {
521: contextIDList.add(NativeValue.UNKNOWN_ID);
522: } else {
523: for (Resource context : contexts) {
524: if (context == null) {
525: contextIDList.add(0);
526: } else {
527: int contextID = valueStore.getID(context);
528: if (contextID != NativeValue.UNKNOWN_ID) {
529: contextIDList.add(contextID);
530: }
531: }
532: }
533: }
534:
535: int removeCount = 0;
536:
537: for (int i = 0; i < contextIDList.size(); i++) {
538: int contextID = contextIDList.get(i);
539:
540: List<Statement> removedStatements = Collections
541: .emptyList();
542:
543: if (hasConnectionListeners()) {
544: // We need to iterate over all matching triples so that they can
545: // be reported
546: RecordIterator btreeIter = tripleStore.getTriples(
547: subjID, predID, objID, contextID, explicit,
548: true);
549:
550: NativeStatementIterator iter = new NativeStatementIterator(
551: btreeIter, valueStore);
552:
553: removedStatements = Iterations.asList(iter);
554: }
555:
556: removeCount += tripleStore.removeTriples(subjID,
557: predID, objID, contextID, explicit);
558:
559: for (Statement st : removedStatements) {
560: notifyStatementRemoved(st);
561: }
562: }
563:
564: if (removeCount > 0) {
565: sailChangedEvent.setStatementsRemoved(true);
566: }
567:
568: return removeCount;
569: } catch (IOException e) {
570: throw new SailException(e);
571: }
572: }
573:
574: @Override
575: protected void clearInternal(Resource... contexts)
576: throws SailException {
577: removeStatements(null, null, null, true, contexts);
578: }
579:
580: public void clearInferred(Resource... contexts)
581: throws SailException {
582: Lock conLock = getSharedConnectionLock();
583: try {
584: verifyIsOpen();
585:
586: Lock txnLock = getTransactionLock();
587: try {
588: autoStartTransaction();
589: removeStatements(null, null, null, false, contexts);
590: } finally {
591: txnLock.release();
592: }
593: } finally {
594: conLock.release();
595: }
596: }
597:
598: public void flushUpdates() {
599: // no-op; changes are reported as soon as they come in
600: }
601:
602: @Override
603: protected void setNamespaceInternal(String prefix, String name)
604: throws SailException {
605: nativeStore.getNamespaceStore().setNamespace(prefix, name);
606: }
607:
608: @Override
609: protected void removeNamespaceInternal(String prefix)
610: throws SailException {
611: nativeStore.getNamespaceStore().removeNamespace(prefix);
612: }
613:
614: @Override
615: protected void clearNamespacesInternal() throws SailException {
616: nativeStore.getNamespaceStore().clear();
617: }
618: }
|