001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.commons.transaction.memory;
018:
019: import java.io.PrintWriter;
020: import java.util.HashSet;
021: import java.util.Iterator;
022: import java.util.Map;
023: import java.util.Set;
024: import java.util.Collections;
025:
026: import org.apache.commons.transaction.locking.ReadWriteLock;
027: import org.apache.commons.transaction.util.LoggerFacade;
028: import org.apache.commons.transaction.util.PrintWriterLogger;
029:
030: /**
031: * Wrapper that adds transactional control to all kinds of maps that implement the {@link Map} interface. By using
032: * a naive optimistic transaction control this wrapper has better isolation than {@link TransactionalMapWrapper}, but
033: * may also fail to commit.
034: *
035: * <br>
036: * Start a transaction by calling {@link #startTransaction()}. Then perform the normal actions on the map and
037: * finally either call {@link #commitTransaction()} to make your changes permanent or {@link #rollbackTransaction()} to
038: * undo them.
039: * <br>
040: * <em>Caution:</em> Do not modify values retrieved by {@link #get(Object)} as this will circumvent the transactional mechanism.
041: * Rather clone the value or copy it in a way you see fit and store it back using {@link #put(Object, Object)}.
042: * <br>
043: * <em>Note:</em> This wrapper guarantees isolation level <code>SERIALIZABLE</code>.
044: * <br>
045: * <em>Caution:</em> This implementation might be slow when large amounts of data is changed in a transaction as much references will need to be copied around.
046: *
047: * @version $Id: OptimisticMapWrapper.java 493628 2007-01-07 01:42:48Z joerg $
048: * @see TransactionalMapWrapper
049: * @see PessimisticMapWrapper
050: */
051: public class OptimisticMapWrapper extends TransactionalMapWrapper {
052:
053: protected static final int COMMIT_TIMEOUT = 1000 * 60; // 1 minute
054: protected static final int ACCESS_TIMEOUT = 1000 * 30; // 30 seconds
055:
056: protected Set activeTransactions;
057:
058: protected LoggerFacade logger;
059:
060: protected ReadWriteLock commitLock;
061:
062: /**
063: * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
064: * data will be instances of {@link java.util.HashMap} and {@link java.util.HashSet}.
065: *
066: * @param wrapped map to be wrapped
067: */
068: public OptimisticMapWrapper(Map wrapped) {
069: this (wrapped, new HashMapFactory(), new HashSetFactory());
070: }
071:
072: /**
073: * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
074: * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
075: *
076: * @param wrapped map to be wrapped
077: * @param mapFactory factory for temporary maps
078: * @param setFactory factory for temporary sets
079: */
080: public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory,
081: SetFactory setFactory) {
082: this (wrapped, mapFactory, setFactory, new PrintWriterLogger(
083: new PrintWriter(System.out), OptimisticMapWrapper.class
084: .getName(), false));
085: }
086:
087: /**
088: * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
089: * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
090: *
091: * @param wrapped map to be wrapped
092: * @param mapFactory factory for temporary maps
093: * @param setFactory factory for temporary sets
094: * @param logger
095: * generic logger used for all kinds of logging
096: */
097: public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory,
098: SetFactory setFactory, LoggerFacade logger) {
099: super (wrapped, mapFactory, setFactory);
100: activeTransactions = Collections.synchronizedSet(new HashSet());
101: this .logger = logger;
102: commitLock = new ReadWriteLock("COMMIT", logger);
103: }
104:
105: public void startTransaction() {
106: if (getActiveTx() != null) {
107: throw new IllegalStateException("Active thread "
108: + Thread.currentThread()
109: + " already associated with a transaction!");
110: }
111: CopyingTxContext context = new CopyingTxContext();
112: activeTransactions.add(context);
113: setActiveTx(context);
114: }
115:
116: public void rollbackTransaction() {
117: TxContext txContext = getActiveTx();
118: super .rollbackTransaction();
119: activeTransactions.remove(txContext);
120: }
121:
122: public void commitTransaction() throws ConflictException {
123: commitTransaction(false);
124: }
125:
126: public void commitTransaction(boolean force)
127: throws ConflictException {
128: TxContext txContext = getActiveTx();
129:
130: if (txContext == null) {
131: throw new IllegalStateException("Active thread "
132: + Thread.currentThread()
133: + " not associated with a transaction!");
134: }
135:
136: if (txContext.status == STATUS_MARKED_ROLLBACK) {
137: throw new IllegalStateException("Active thread "
138: + Thread.currentThread()
139: + " is marked for rollback!");
140: }
141:
142: try {
143: // in this final commit phase we need to be the only one access the map
144: // to make sure no one adds an entry after we checked for conflicts
145: commitLock.acquireWrite(txContext, COMMIT_TIMEOUT);
146:
147: if (!force) {
148: Object conflictKey = checkForConflicts();
149: if (conflictKey != null) {
150: throw new ConflictException(conflictKey);
151: }
152: }
153:
154: activeTransactions.remove(txContext);
155: copyChangesToConcurrentTransactions();
156: super .commitTransaction();
157:
158: } catch (InterruptedException e) {
159: // XXX a bit dirty ;)
160: throw new ConflictException(e);
161: } finally {
162: commitLock.release(txContext);
163: }
164: }
165:
166: // TODO: Shouldn't we return a collection rather than a single key here?
167: public Object checkForConflicts() {
168: CopyingTxContext txContext = (CopyingTxContext) getActiveTx();
169:
170: Set keys = txContext.changedKeys();
171: Set externalKeys = txContext.externalChangedKeys();
172:
173: for (Iterator it2 = keys.iterator(); it2.hasNext();) {
174: Object key = it2.next();
175: if (externalKeys.contains(key)) {
176: return key;
177: }
178: }
179: return null;
180: }
181:
182: protected void copyChangesToConcurrentTransactions() {
183: CopyingTxContext this TxContext = (CopyingTxContext) getActiveTx();
184:
185: synchronized (activeTransactions) {
186: for (Iterator it = activeTransactions.iterator(); it
187: .hasNext();) {
188: CopyingTxContext otherTxContext = (CopyingTxContext) it
189: .next();
190:
191: // no need to copy data if the other transaction does not access global map anyway
192: if (otherTxContext.cleared)
193: continue;
194:
195: if (this TxContext.cleared) {
196: // we will clear everything, so we have to copy everything before
197: otherTxContext.externalChanges.putAll(wrapped);
198: } else // no need to check if we have already copied everthing
199: {
200: for (Iterator it2 = this TxContext.changes
201: .entrySet().iterator(); it2.hasNext();) {
202: Map.Entry entry = (Map.Entry) it2.next();
203: Object value = wrapped.get(entry.getKey());
204: if (value != null) {
205: // undo change
206: otherTxContext.externalChanges.put(entry
207: .getKey(), value);
208: } else {
209: // undo add
210: otherTxContext.externalDeletes.add(entry
211: .getKey());
212: }
213: }
214:
215: for (Iterator it2 = this TxContext.deletes
216: .iterator(); it2.hasNext();) {
217: // undo delete
218: Object key = it2.next();
219: Object value = wrapped.get(key);
220: otherTxContext.externalChanges.put(key, value);
221: }
222: }
223: }
224: }
225: }
226:
227: public class CopyingTxContext extends TxContext {
228: protected Map externalChanges;
229: protected Map externalAdds;
230: protected Set externalDeletes;
231:
232: protected CopyingTxContext() {
233: super ();
234: externalChanges = mapFactory.createMap();
235: externalDeletes = setFactory.createSet();
236: externalAdds = mapFactory.createMap();
237: }
238:
239: protected Set externalChangedKeys() {
240: Set keySet = new HashSet();
241: keySet.addAll(externalDeletes);
242: keySet.addAll(externalChanges.keySet());
243: keySet.addAll(externalAdds.keySet());
244: return keySet;
245: }
246:
247: protected Set changedKeys() {
248: Set keySet = new HashSet();
249: keySet.addAll(deletes);
250: keySet.addAll(changes.keySet());
251: keySet.addAll(adds.keySet());
252: return keySet;
253: }
254:
255: protected Set keys() {
256: try {
257: commitLock.acquireRead(this , ACCESS_TIMEOUT);
258: Set keySet = super .keys();
259: keySet.removeAll(externalDeletes);
260: keySet.addAll(externalAdds.keySet());
261: return keySet;
262: } catch (InterruptedException e) {
263: return null;
264: } finally {
265: commitLock.release(this );
266: }
267: }
268:
269: protected Object get(Object key) {
270: try {
271: commitLock.acquireRead(this , ACCESS_TIMEOUT);
272:
273: if (deletes.contains(key)) {
274: // reflects that entry has been deleted in this tx
275: return null;
276: }
277:
278: Object changed = changes.get(key);
279: if (changed != null) {
280: return changed;
281: }
282:
283: Object added = adds.get(key);
284: if (added != null) {
285: return added;
286: }
287:
288: if (cleared) {
289: return null;
290: } else {
291: if (externalDeletes.contains(key)) {
292: // reflects that entry has been deleted in this tx
293: return null;
294: }
295:
296: changed = externalChanges.get(key);
297: if (changed != null) {
298: return changed;
299: }
300:
301: added = externalAdds.get(key);
302: if (added != null) {
303: return added;
304: }
305:
306: // not modified in this tx
307: return wrapped.get(key);
308: }
309: } catch (InterruptedException e) {
310: return null;
311: } finally {
312: commitLock.release(this );
313: }
314: }
315:
316: protected void put(Object key, Object value) {
317: try {
318: commitLock.acquireRead(this , ACCESS_TIMEOUT);
319: super .put(key, value);
320: } catch (InterruptedException e) {
321: } finally {
322: commitLock.release(this );
323: }
324: }
325:
326: protected void remove(Object key) {
327: try {
328: commitLock.acquireRead(this , ACCESS_TIMEOUT);
329: super .remove(key);
330: } catch (InterruptedException e) {
331: } finally {
332: commitLock.release(this );
333: }
334: }
335:
336: protected int size() {
337: try {
338: commitLock.acquireRead(this , ACCESS_TIMEOUT);
339: int size = super .size();
340:
341: size -= externalDeletes.size();
342: size += externalAdds.size();
343:
344: return size;
345: } catch (InterruptedException e) {
346: return -1;
347: } finally {
348: commitLock.release(this );
349: }
350: }
351:
352: protected void clear() {
353: try {
354: commitLock.acquireRead(this , ACCESS_TIMEOUT);
355: super .clear();
356: externalDeletes.clear();
357: externalChanges.clear();
358: externalAdds.clear();
359: } catch (InterruptedException e) {
360: } finally {
361: commitLock.release(this );
362: }
363: }
364:
365: protected void merge() {
366: try {
367: commitLock.acquireRead(this , ACCESS_TIMEOUT);
368: super .merge();
369: } catch (InterruptedException e) {
370: } finally {
371: commitLock.release(this );
372: }
373: }
374:
375: protected void dispose() {
376: try {
377: commitLock.acquireRead(this , ACCESS_TIMEOUT);
378: super .dispose();
379: setFactory.disposeSet(externalDeletes);
380: externalDeletes = null;
381: mapFactory.disposeMap(externalChanges);
382: externalChanges = null;
383: mapFactory.disposeMap(externalAdds);
384: externalAdds = null;
385: } catch (InterruptedException e) {
386: } finally {
387: commitLock.release(this );
388: }
389: }
390:
391: protected void finalize() throws Throwable {
392: activeTransactions.remove(this);
393: super.finalize();
394: }
395: }
396: }
|