001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 1997-2007.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.helpers;
007:
008: import java.util.ArrayList;
009: import java.util.Collections;
010: import java.util.LinkedList;
011: import java.util.List;
012:
013: import org.slf4j.Logger;
014: import org.slf4j.LoggerFactory;
015:
016: import info.aduna.concurrent.locks.ExclusiveLockManager;
017: import info.aduna.concurrent.locks.Lock;
018: import info.aduna.concurrent.locks.ReadWriteLockManager;
019: import info.aduna.concurrent.locks.WritePrefReadWriteLockManager;
020: import info.aduna.iteration.CloseableIteration;
021:
022: import org.openrdf.model.Namespace;
023: import org.openrdf.model.Resource;
024: import org.openrdf.model.Statement;
025: import org.openrdf.model.URI;
026: import org.openrdf.model.Value;
027: import org.openrdf.query.BindingSet;
028: import org.openrdf.query.Dataset;
029: import org.openrdf.query.QueryEvaluationException;
030: import org.openrdf.query.algebra.TupleExpr;
031: import org.openrdf.sail.SailConnection;
032: import org.openrdf.sail.SailConnectionListener;
033: import org.openrdf.sail.SailException;
034:
035: /**
036: * Abstract Class offering base functionality for SailConnection
037: * implementations.
038: *
039: * @author Arjohn Kampman
040: * @author jeen
041: */
042: public abstract class SailConnectionBase implements SailConnection {
043:
044: protected final Logger logger = LoggerFactory.getLogger(this
045: .getClass());
046:
047: /*-----------*
048: * Variables *
049: *-----------*/
050:
051: private final SailBase sailBase;
052:
053: private boolean isOpen;
054:
055: private boolean txnActive;
056:
057: /**
058: * A read-write lock manager used to handle multi-threaded access on the
059: * connection. Every operation on the connection must first obtain a shared
060: * (read) lock. When close() is invoked on this connection, the close()
061: * method will first obtain an exclusive (write) lock: it will wait until
062: * active operations finish and then block any further operations on the
063: * connection.
064: */
065: private final ReadWriteLockManager connectionLockManager = new WritePrefReadWriteLockManager();
066:
067: /**
068: * A multi-read-single-write lock manager used to handle multi-threaded
069: * access on the transaction-related methods of a connection. Every
070: * transaction operation except commit and rollback must first obtain a
071: * shared (read) lock. The commit and rollback themselves will first obtain
072: * an exclusive (write) lock, which will guarantee that there will be no
073: * updates during these operations.
074: */
075: private final ExclusiveLockManager txnLockManager = new ExclusiveLockManager();
076:
077: // FIXME: use weak references here?
078: private List<SailBaseIteration> activeIterations = Collections
079: .synchronizedList(new LinkedList<SailBaseIteration>());
080:
081: private List<SailConnectionListener> listeners;
082:
083: /*--------------*
084: * Constructors *
085: *--------------*/
086:
087: public SailConnectionBase(SailBase sailBase) {
088: this .sailBase = sailBase;
089: isOpen = true;
090: txnActive = false;
091: listeners = new ArrayList<SailConnectionListener>(0);
092: }
093:
094: /*---------*
095: * Methods *
096: *---------*/
097:
098: public final boolean isOpen() throws SailException {
099: return isOpen;
100: }
101:
102: protected void verifyIsOpen() throws SailException {
103: if (!isOpen) {
104: throw new IllegalStateException(
105: "Connection has been closed");
106: }
107: }
108:
109: public final void close() throws SailException {
110: // obtain an exclusive lock so that any further operations on this
111: // connection (including those from any concurrent threads) are blocked.
112: Lock conLock = getExclusiveConnectionLock();
113:
114: try {
115: if (isOpen) {
116: try {
117: while (true) {
118: SailBaseIteration ci = null;
119:
120: synchronized (activeIterations) {
121: if (activeIterations.isEmpty()) {
122: break;
123: } else {
124: ci = activeIterations.remove(0);
125: }
126: }
127:
128: try {
129: ci.forceClose();
130: } catch (SailException e) {
131: throw e;
132: } catch (Exception e) {
133: throw new SailException(e);
134: }
135: }
136:
137: assert activeIterations.isEmpty();
138:
139: if (txnActive) {
140: logger
141: .warn(
142: "Rolling back transaction due to connection close",
143: new Throwable());
144: try {
145: // Use internal method to avoid deadlock: the public
146: // rollback method will try to obtain a connection lock
147: rollbackInternal();
148: } finally {
149: txnActive = false;
150: }
151: }
152:
153: closeInternal();
154: } finally {
155: isOpen = false;
156: sailBase.connectionClosed(this );
157: }
158: }
159: } finally {
160: // Release the exclusive lock. Any threads waiting to obtain a
161: // non-exclusive read lock will get one and then fail with an
162: // IllegalStateException, because the connection is no longer open.
163: conLock.release();
164: }
165: }
166:
167: public final CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(
168: TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
169: boolean includeInferred) throws SailException {
170: Lock conLock = getSharedConnectionLock();
171: try {
172: verifyIsOpen();
173: return registerIteration(evaluateInternal(tupleExpr,
174: dataset, bindings, includeInferred));
175: } finally {
176: conLock.release();
177: }
178: }
179:
180: public final CloseableIteration<? extends Resource, SailException> getContextIDs()
181: throws SailException {
182: Lock conLock = getSharedConnectionLock();
183: try {
184: verifyIsOpen();
185: return registerIteration(getContextIDsInternal());
186: } finally {
187: conLock.release();
188: }
189: }
190:
191: public final CloseableIteration<? extends Statement, SailException> getStatements(
192: Resource subj, URI pred, Value obj,
193: boolean includeInferred, Resource... contexts)
194: throws SailException {
195: Lock conLock = getSharedConnectionLock();
196: try {
197: verifyIsOpen();
198: return registerIteration(getStatementsInternal(subj, pred,
199: obj, includeInferred, contexts));
200: } finally {
201: conLock.release();
202: }
203: }
204:
205: public final long size(Resource... contexts) throws SailException {
206: Lock conLock = getSharedConnectionLock();
207: try {
208: verifyIsOpen();
209: return sizeInternal(contexts);
210: } finally {
211: conLock.release();
212: }
213: }
214:
215: protected final boolean transactionActive() {
216: return txnActive;
217: }
218:
219: protected void autoStartTransaction() throws SailException {
220: if (!txnActive) {
221: startTransactionInternal();
222: txnActive = true;
223: }
224: }
225:
226: public final void commit() throws SailException {
227: Lock conLock = getSharedConnectionLock();
228: try {
229: verifyIsOpen();
230:
231: Lock txnLock = getTransactionLock();
232: try {
233: if (txnActive) {
234: commitInternal();
235: txnActive = false;
236: }
237: } finally {
238: txnLock.release();
239: }
240: } finally {
241: conLock.release();
242: }
243: }
244:
245: public final void rollback() throws SailException {
246: Lock conLock = getSharedConnectionLock();
247: try {
248: verifyIsOpen();
249:
250: Lock txnLock = getTransactionLock();
251: try {
252: if (txnActive) {
253: try {
254: rollbackInternal();
255: } finally {
256: txnActive = false;
257: }
258: }
259: } finally {
260: txnLock.release();
261: }
262: } finally {
263: conLock.release();
264: }
265: }
266:
267: public final void addStatement(Resource subj, URI pred, Value obj,
268: Resource... contexts) throws SailException {
269: Lock conLock = getSharedConnectionLock();
270: try {
271: verifyIsOpen();
272:
273: Lock txnLock = getTransactionLock();
274: try {
275: autoStartTransaction();
276: addStatementInternal(subj, pred, obj, contexts);
277: } finally {
278: txnLock.release();
279: }
280: } finally {
281: conLock.release();
282: }
283: }
284:
285: public final void removeStatements(Resource subj, URI pred,
286: Value obj, Resource... contexts) throws SailException {
287: Lock conLock = getSharedConnectionLock();
288: try {
289: verifyIsOpen();
290:
291: Lock txnLock = getTransactionLock();
292: try {
293: autoStartTransaction();
294: removeStatementsInternal(subj, pred, obj, contexts);
295: } finally {
296: txnLock.release();
297: }
298: } finally {
299: conLock.release();
300: }
301: }
302:
303: public final void clear(Resource... contexts) throws SailException {
304: Lock conLock = getSharedConnectionLock();
305: try {
306: verifyIsOpen();
307:
308: Lock txnLock = getTransactionLock();
309: try {
310: autoStartTransaction();
311: clearInternal(contexts);
312: } finally {
313: txnLock.release();
314: }
315: } finally {
316: conLock.release();
317: }
318: }
319:
320: public final CloseableIteration<? extends Namespace, SailException> getNamespaces()
321: throws SailException {
322: Lock conLock = getSharedConnectionLock();
323: try {
324: verifyIsOpen();
325: return registerIteration(getNamespacesInternal());
326: } finally {
327: conLock.release();
328: }
329: }
330:
331: public final String getNamespace(String prefix)
332: throws SailException {
333: Lock conLock = getSharedConnectionLock();
334: try {
335: verifyIsOpen();
336: return getNamespaceInternal(prefix);
337: } finally {
338: conLock.release();
339: }
340: }
341:
342: public final void setNamespace(String prefix, String name)
343: throws SailException {
344: Lock conLock = getSharedConnectionLock();
345: try {
346: verifyIsOpen();
347:
348: Lock txnLock = getTransactionLock();
349: try {
350: autoStartTransaction();
351: setNamespaceInternal(prefix, name);
352: } finally {
353: txnLock.release();
354: }
355: } finally {
356: conLock.release();
357: }
358: }
359:
360: public final void removeNamespace(String prefix)
361: throws SailException {
362: Lock conLock = getSharedConnectionLock();
363: try {
364: verifyIsOpen();
365:
366: Lock txnLock = getTransactionLock();
367: try {
368: autoStartTransaction();
369: removeNamespaceInternal(prefix);
370: } finally {
371: txnLock.release();
372: }
373: } finally {
374: conLock.release();
375: }
376: }
377:
378: public final void clearNamespaces() throws SailException {
379: Lock conLock = getSharedConnectionLock();
380: try {
381: verifyIsOpen();
382:
383: Lock txnLock = getTransactionLock();
384: try {
385: autoStartTransaction();
386: clearNamespacesInternal();
387: } finally {
388: txnLock.release();
389: }
390: } finally {
391: conLock.release();
392: }
393: }
394:
395: public void addConnectionListener(SailConnectionListener listener) {
396: synchronized (listeners) {
397: listeners.add(listener);
398: }
399: }
400:
401: public void removeConnectionListener(SailConnectionListener listener) {
402: synchronized (listeners) {
403: listeners.remove(listener);
404: }
405: }
406:
407: protected boolean hasConnectionListeners() {
408: synchronized (listeners) {
409: return !listeners.isEmpty();
410: }
411: }
412:
413: protected void notifyStatementAdded(Statement st) {
414: synchronized (listeners) {
415: for (SailConnectionListener listener : listeners) {
416: listener.statementAdded(st);
417: }
418: }
419: }
420:
421: protected void notifyStatementRemoved(Statement st) {
422: synchronized (listeners) {
423: for (SailConnectionListener listener : listeners) {
424: listener.statementRemoved(st);
425: }
426: }
427: }
428:
429: protected Lock getSharedConnectionLock() throws SailException {
430: try {
431: return connectionLockManager.getReadLock();
432: } catch (InterruptedException e) {
433: throw new SailException(e);
434: }
435: }
436:
437: protected Lock getExclusiveConnectionLock() throws SailException {
438: try {
439: return connectionLockManager.getWriteLock();
440: } catch (InterruptedException e) {
441: throw new SailException(e);
442: }
443: }
444:
445: protected Lock getTransactionLock() throws SailException {
446: try {
447: return txnLockManager.getExclusiveLock();
448: } catch (InterruptedException e) {
449: throw new SailException(e);
450: }
451: }
452:
453: /**
454: * Registers an iteration as active by wrapping it in a
455: * {@link SailBaseIteration} object and adding it to the list of active
456: * iterations.
457: */
458: protected <T, E extends Exception> CloseableIteration<T, E> registerIteration(
459: CloseableIteration<T, E> iter) {
460: SailBaseIteration<T, E> result = new SailBaseIteration<T, E>(
461: iter, this );
462: activeIterations.add(result);
463: return result;
464: }
465:
466: /**
467: * Called by {@link SailBaseIteration} to indicate that it has been closed.
468: */
469: protected void iterationClosed(SailBaseIteration iter) {
470: activeIterations.remove(iter);
471: }
472:
473: protected abstract void closeInternal() throws SailException;
474:
475: protected abstract CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(
476: TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
477: boolean includeInferred) throws SailException;
478:
479: protected abstract CloseableIteration<? extends Resource, SailException> getContextIDsInternal()
480: throws SailException;
481:
482: protected abstract CloseableIteration<? extends Statement, SailException> getStatementsInternal(
483: Resource subj, URI pred, Value obj,
484: boolean includeInferred, Resource... contexts)
485: throws SailException;
486:
487: protected abstract long sizeInternal(Resource... contexts)
488: throws SailException;
489:
490: protected abstract void startTransactionInternal()
491: throws SailException;
492:
493: protected abstract void commitInternal() throws SailException;
494:
495: protected abstract void rollbackInternal() throws SailException;
496:
497: protected abstract void addStatementInternal(Resource subj,
498: URI pred, Value obj, Resource... contexts)
499: throws SailException;
500:
501: protected abstract void removeStatementsInternal(Resource subj,
502: URI pred, Value obj, Resource... contexts)
503: throws SailException;
504:
505: protected abstract void clearInternal(Resource... contexts)
506: throws SailException;
507:
508: protected abstract CloseableIteration<? extends Namespace, SailException> getNamespacesInternal()
509: throws SailException;
510:
511: protected abstract String getNamespaceInternal(String prefix)
512: throws SailException;
513:
514: protected abstract void setNamespaceInternal(String prefix,
515: String name) throws SailException;
516:
517: protected abstract void removeNamespaceInternal(String prefix)
518: throws SailException;
519:
520: protected abstract void clearNamespacesInternal()
521: throws SailException;
522: }
|