001: /*
002: * Copyright James Leigh (c) 2007.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.repository.event.base;
007:
008: import java.util.ArrayList;
009: import java.util.List;
010: import java.util.Set;
011: import java.util.concurrent.CopyOnWriteArraySet;
012:
013: import org.openrdf.model.Namespace;
014: import org.openrdf.model.Resource;
015: import org.openrdf.model.Statement;
016: import org.openrdf.model.URI;
017: import org.openrdf.model.Value;
018: import org.openrdf.repository.Repository;
019: import org.openrdf.repository.RepositoryConnection;
020: import org.openrdf.repository.RepositoryException;
021: import org.openrdf.repository.RepositoryResult;
022: import org.openrdf.repository.base.RepositoryConnectionWrapper;
023: import org.openrdf.repository.event.NotifyingRepositoryConnection;
024: import org.openrdf.repository.event.RepositoryConnectionListener;
025:
026: /**
027: * This broadcaster is used by the RepositoryBroadcaster to wrap the delegate
028: * repository connection. There are two types of listeners for the repository
029: * connection, {@link RepositoryConnectionStateListener} and
030: * {@link RepositoryConnectionListener}. Listeners are notified of changes
031: * after they have occurred.
032: *
033: * @author James Leigh
034: * @author Herko ter Horst
035: */
036: public class NotifyingRepositoryConnectionWrapper extends
037: RepositoryConnectionWrapper implements
038: NotifyingRepositoryConnection {
039:
040: /*-----------*
041: * Variables *
042: *-----------*/
043:
044: private boolean activated = false;
045:
046: private boolean reportDeltas = false;
047:
048: private Set<RepositoryConnectionListener> listeners = new CopyOnWriteArraySet<RepositoryConnectionListener>();
049:
050: /*--------------*
051: * Constructors *
052: *--------------*/
053:
054: public NotifyingRepositoryConnectionWrapper(Repository repository,
055: RepositoryConnection connection) {
056: super (repository, connection);
057: }
058:
059: public NotifyingRepositoryConnectionWrapper(Repository repository,
060: RepositoryConnection connection, boolean reportDeltas) {
061: this (repository, connection);
062: setReportDeltas(reportDeltas);
063: }
064:
065: /*---------*
066: * Methods *
067: *---------*/
068:
069: public boolean reportDeltas() {
070: return reportDeltas;
071: }
072:
073: public void setReportDeltas(boolean reportDeltas) {
074: this .reportDeltas = reportDeltas;
075: }
076:
077: /**
078: * Registers a <tt>RepositoryConnectionListener</tt> that will receive
079: * notifications of operations that are performed on this connection.
080: */
081: public void addRepositoryConnectionListener(
082: RepositoryConnectionListener listener) {
083: listeners.add(listener);
084: activated = true;
085: }
086:
087: /**
088: * Removes a registered <tt>RepositoryConnectionListener</tt> from this
089: * connection.
090: */
091: public void removeRepositoryConnectionListener(
092: RepositoryConnectionListener listener) {
093: listeners.remove(listener);
094: activated = !listeners.isEmpty();
095: }
096:
097: @Override
098: protected boolean isDelegatingAdd() {
099: return !activated;
100: }
101:
102: @Override
103: protected boolean isDelegatingRemove() {
104: return !activated;
105: }
106:
107: @Override
108: public void addWithoutCommit(Resource subject, URI predicate,
109: Value object, Resource... contexts)
110: throws RepositoryException {
111: boolean reportEvent = activated;
112:
113: if (reportEvent && reportDeltas()) {
114: // Only report if the stament is not present yet
115: reportEvent = !getDelegate().hasStatement(subject,
116: predicate, object, false, contexts);
117: }
118:
119: getDelegate().add(subject, predicate, object, contexts);
120:
121: if (reportEvent) {
122: for (RepositoryConnectionListener listener : listeners) {
123: listener
124: .add(this , subject, predicate, object, contexts);
125: }
126: }
127: }
128:
129: @Override
130: public void clear(Resource... contexts) throws RepositoryException {
131: if (activated && reportDeltas()) {
132: removeWithoutCommit(null, null, null, contexts);
133: } else if (activated) {
134: getDelegate().clear(contexts);
135: for (RepositoryConnectionListener listener : listeners) {
136: listener.clear(this , contexts);
137: }
138: } else {
139: getDelegate().clear(contexts);
140: }
141: }
142:
143: @Override
144: public void close() throws RepositoryException {
145: super .close();
146:
147: if (activated) {
148: for (RepositoryConnectionListener listener : listeners) {
149: listener.close(this );
150: }
151: }
152: }
153:
154: @Override
155: public void commit() throws RepositoryException {
156: getDelegate().commit();
157:
158: if (activated) {
159: for (RepositoryConnectionListener listener : listeners) {
160: listener.commit(this );
161: }
162: }
163: }
164:
165: @Override
166: public void removeWithoutCommit(Resource subj, URI pred, Value obj,
167: Resource... ctx) throws RepositoryException {
168: if (activated && reportDeltas()) {
169: RepositoryResult<Statement> stmts;
170: stmts = getDelegate().getStatements(subj, pred, obj, false,
171: ctx);
172: List<Statement> list = new ArrayList<Statement>();
173: try {
174: while (stmts.hasNext()) {
175: list.add(stmts.next());
176: }
177: } finally {
178: stmts.close();
179: }
180: getDelegate().remove(subj, pred, obj, ctx);
181: for (RepositoryConnectionListener listener : listeners) {
182: for (Statement stmt : list) {
183: Resource s = stmt.getSubject();
184: URI p = stmt.getPredicate();
185: Value o = stmt.getObject();
186: Resource c = stmt.getContext();
187: listener.remove(this , s, p, o, c);
188: }
189: }
190: } else if (activated) {
191: getDelegate().remove(subj, pred, obj, ctx);
192: for (RepositoryConnectionListener listener : listeners) {
193: listener.remove(this , subj, pred, obj, ctx);
194: }
195: } else {
196: getDelegate().remove(subj, pred, obj, ctx);
197: }
198: }
199:
200: @Override
201: public void removeNamespace(String prefix)
202: throws RepositoryException {
203: getDelegate().removeNamespace(prefix);
204:
205: if (activated) {
206: for (RepositoryConnectionListener listener : listeners) {
207: listener.removeNamespace(this , prefix);
208: }
209: }
210: }
211:
212: @Override
213: public void clearNamespaces() throws RepositoryException {
214: if (activated && reportDeltas()) {
215: RepositoryResult<Namespace> namespaces;
216: namespaces = getDelegate().getNamespaces();
217: List<String> prefix = new ArrayList<String>();
218: try {
219: while (namespaces.hasNext()) {
220: Namespace ns = namespaces.next();
221: prefix.add(ns.getPrefix());
222: }
223: } finally {
224: namespaces.close();
225: }
226: getDelegate().clearNamespaces();
227: for (String p : prefix) {
228: removeNamespace(p);
229: }
230: } else if (activated) {
231: getDelegate().clearNamespaces();
232: for (RepositoryConnectionListener listener : listeners) {
233: listener.clearNamespaces(this );
234: }
235: } else {
236: getDelegate().clearNamespaces();
237: }
238: }
239:
240: @Override
241: public void rollback() throws RepositoryException {
242: getDelegate().rollback();
243:
244: if (activated) {
245: for (RepositoryConnectionListener listener : listeners) {
246: listener.rollback(this );
247: }
248: }
249: }
250:
251: @Override
252: public void setAutoCommit(boolean autoCommit)
253: throws RepositoryException {
254: boolean wasAutoCommit = isAutoCommit();
255: getDelegate().setAutoCommit(autoCommit);
256:
257: if (activated && wasAutoCommit != autoCommit) {
258: for (RepositoryConnectionListener listener : listeners) {
259: listener.setAutoCommit(this , autoCommit);
260: }
261: if (autoCommit) {
262: for (RepositoryConnectionListener listener : listeners) {
263: listener.commit(this );
264: }
265: }
266: }
267: }
268:
269: @Override
270: public void setNamespace(String prefix, String name)
271: throws RepositoryException {
272: getDelegate().setNamespace(prefix, name);
273:
274: if (activated) {
275: for (RepositoryConnectionListener listener : listeners) {
276: listener.setNamespace(this, prefix, name);
277: }
278: }
279: }
280: }
|