001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.test.transactions;
005:
006: import org.apache.commons.lang.builder.EqualsBuilder;
007:
008: import com.tc.util.Assert;
009: import com.tc.util.stringification.OurStringBuilder;
010:
011: import java.util.HashSet;
012: import java.util.Iterator;
013: import java.util.Set;
014:
015: /**
016: * The standard implementation of {@link TransactionalObject}. See that class for details.
017: * </p>
018: * <p>
019: * This implementation allows reads to also be checked with a slight relaxation of the rules, called <em>slop</em>:
020: * if, when you read, you get a value that's invalid, <em>but</em> would have been valid were the read issued at any
021: * point up to <code>slop</code> milliseconds ago, it's treated as valid. This is used for our tests that involve the
022: * Terracotta database listener, as invalidations take a nonzero amount of time to travel.
023: * </p>
024: * <p>
025: * Be warned: this class's implementation is quite tricky. Make changes with care, and re-run the test whenever you
026: * change it.
027: * </p>
028: * <p>
029: * (The set of all values you could possibly read includes all values the object has had from the current time less the
030: * slop up to the start of the read, in addition to all values for which writes have at least been started (by calling
031: * {@link startWrite(Object)} for them) before the moment you called {@link #endRead(Context, Object)}.)
032: * </p>
033: * <p>
034: * This class works in a bit of a funny way: if <code>slop</code> is exactly zero, we need to maintain order of method
035: * calls — many method calls can happen without {@link System#currentTimeMillis()} changing at all. To do that, we
036: * simply use a {@link Timing} object, rather than a raw <code>long</code>, to store our times; there's one
037: * implementation (which we use if <code>slop</code> is nonzero) that uses just the raw time from
038: * {@link System#currentTimeMillis()}, while there's another (which we use if <code>slop</code> is exactly zero) that
039: * uses both the raw time and an internally-generated sequence number, to distinguish calls that happen at the same
040: * 'time' (according to {@link System#currentTimeMillis()}).
041: */
042: public class StandardTransactionalObject implements TransactionalObject {
043:
044: private static interface Timing {
045: boolean isAfter(Timing other);
046:
047: boolean isBefore(Timing other);
048:
049: boolean isAfterOrEqual(Timing other);
050:
051: boolean isBeforeOrEqual(Timing other);
052:
053: long rawTime();
054: }
055:
056: private static class SequencedTiming implements Timing {
057: private final long actualTime;
058: private final long sequenceNumber;
059:
060: public SequencedTiming(long actualTime, long sequenceNumber) {
061: this .actualTime = actualTime;
062: this .sequenceNumber = sequenceNumber;
063: }
064:
065: public boolean isAfter(Timing rawOther) {
066: SequencedTiming other = (SequencedTiming) rawOther;
067: if (other == null)
068: return true;
069: return this .actualTime > other.actualTime
070: || (this .actualTime == other.actualTime && this .sequenceNumber > other.sequenceNumber);
071: }
072:
073: public boolean isBefore(Timing rawOther) {
074: SequencedTiming other = (SequencedTiming) rawOther;
075: if (other == null)
076: return false;
077: return this .actualTime < other.actualTime
078: || (this .actualTime == other.actualTime && this .sequenceNumber < other.sequenceNumber);
079: }
080:
081: public boolean isAfterOrEqual(Timing other) {
082: return isAfter(other) || equals(other);
083: }
084:
085: public boolean isBeforeOrEqual(Timing other) {
086: return isBefore(other) || equals(other);
087: }
088:
089: public boolean equals(Object that) {
090: if (!(that instanceof SequencedTiming))
091: return false;
092: SequencedTiming thatTiming = (SequencedTiming) that;
093: return new EqualsBuilder().append(this .actualTime,
094: thatTiming.actualTime).append(this .sequenceNumber,
095: thatTiming.sequenceNumber).isEquals();
096: }
097:
098: public long rawTime() {
099: return this .actualTime;
100: }
101:
102: public String toString() {
103: return new OurStringBuilder(this ,
104: OurStringBuilder.COMPACT_STYLE).append(
105: "actual time", this .actualTime).append(
106: "sequence number", this .sequenceNumber).toString();
107: }
108: }
109:
110: private static class BasicTiming implements Timing {
111: private final long time;
112:
113: public BasicTiming(long time) {
114: this .time = time;
115: }
116:
117: public boolean isAfter(Timing rawOther) {
118: if (rawOther == null)
119: return true;
120: BasicTiming other = (BasicTiming) rawOther;
121: return this .time > other.time;
122: }
123:
124: public boolean isBefore(Timing rawOther) {
125: if (rawOther == null)
126: return false;
127: BasicTiming other = (BasicTiming) rawOther;
128: return this .time < other.time;
129: }
130:
131: public boolean isAfterOrEqual(Timing other) {
132: return isAfter(other) || equals(other);
133: }
134:
135: public boolean isBeforeOrEqual(Timing other) {
136: return isBefore(other) || equals(other);
137: }
138:
139: public boolean equals(Object that) {
140: if (!(that instanceof BasicTiming))
141: return false;
142: BasicTiming thatTiming = (BasicTiming) that;
143: return new EqualsBuilder().append(this .time,
144: thatTiming.time).isEquals();
145: }
146:
147: public long rawTime() {
148: return this .time;
149: }
150:
151: public String toString() {
152: return new OurStringBuilder(this ,
153: OurStringBuilder.COMPACT_STYLE).append("time",
154: this .time).toString();
155: }
156: }
157:
158: private static class Write implements Context {
159: private final Object value;
160: private final Timing startedAt;
161: private Timing committedAt;
162:
163: public Write(Object value, Timing startedAt) {
164: Assert.assertNotNull(startedAt);
165: this .value = value;
166: this .startedAt = startedAt;
167: this .committedAt = null;
168: }
169:
170: public void commit(Timing now) {
171: Assert.eval(this .committedAt == null);
172: this .committedAt = now;
173: }
174:
175: public Timing startedAt() {
176: return this .startedAt;
177: }
178:
179: public Timing committedAt() {
180: return this .committedAt;
181: }
182:
183: public boolean isCommitted() {
184: return this .committedAt != null;
185: }
186:
187: public Object value() {
188: return this .value;
189: }
190:
191: public String toString() {
192: return new OurStringBuilder(this ,
193: OurStringBuilder.COMPACT_STYLE).append("value",
194: value).append("started at", startedAt).append(
195: "committed at", committedAt).toString();
196: }
197: }
198:
199: private static class Read implements Context {
200: private final Timing startedAt;
201:
202: public Read(Timing startedAt) {
203: Assert.eval(startedAt != null);
204: this .startedAt = startedAt;
205: }
206:
207: public Timing startedAt() {
208: return this .startedAt;
209: }
210:
211: public String toString() {
212: return new OurStringBuilder(this ,
213: OurStringBuilder.COMPACT_STYLE).append(
214: "started at", startedAt).toString();
215: }
216: }
217:
218: private static final long DEFAULT_GC_SLOP = 60 * 1000;
219:
220: private final String name;
221: private final long slop;
222: private final long gcSlop;
223: private final Set currentWrites;
224: private final Set currentReads;
225: private long lastSequence;
226:
227: public StandardTransactionalObject(String name, long slop,
228: long gcSlop, Object initialValue, long now) {
229: Assert.assertNotBlank(name);
230: Assert.eval(slop >= 0);
231: Assert.eval(gcSlop >= 0);
232: this .name = name;
233: this .slop = slop;
234: this .gcSlop = gcSlop;
235: this .currentWrites = new HashSet();
236: this .currentReads = new HashSet();
237: this .lastSequence = 0;
238: endWrite(startWrite(initialValue, now), now);
239: }
240:
241: public StandardTransactionalObject(String name, long slop,
242: Object initialValue, long now) {
243: this (name, slop, DEFAULT_GC_SLOP, initialValue, now);
244: }
245:
246: public StandardTransactionalObject(String name, long slop,
247: Object initialValue) {
248: this (name, slop, initialValue, System.currentTimeMillis());
249: }
250:
251: public StandardTransactionalObject(String name,
252: Object initialValue, long now) {
253: this (name, 0, initialValue, now);
254: }
255:
256: public StandardTransactionalObject(String name, Object initialValue) {
257: this (name, 0, initialValue, System.currentTimeMillis());
258: }
259:
260: private Timing createTiming(long time) {
261: return createTiming(time, 0);
262: }
263:
264: private Timing createTiming(long time, long offset) {
265: if (this .slop == 0)
266: return new SequencedTiming(time + offset, nextSequence());
267: else
268: return new BasicTiming(time + offset);
269: }
270:
271: private Timing createTiming(Timing time, long offset) {
272: if (this .slop == 0)
273: return new SequencedTiming(
274: ((SequencedTiming) time).actualTime + offset,
275: nextSequence());
276: else
277: return new BasicTiming(((BasicTiming) time).time + offset);
278: }
279:
280: private synchronized long nextSequence() {
281: return ++this .lastSequence;
282: }
283:
284: public synchronized Context startWrite(Object value) {
285: return startWrite(value, System.currentTimeMillis());
286: }
287:
288: public synchronized Context startWrite(Object value, long now) {
289: Assert.eval(now >= 0);
290:
291: gc(now);
292:
293: Write out = new Write(value, createTiming(now));
294: this .currentWrites.add(out);
295: return out;
296: }
297:
298: public synchronized void endWrite(Context rawWrite) {
299: endWrite(rawWrite, System.currentTimeMillis());
300: }
301:
302: public synchronized void endWrite(Context rawWrite, long now) {
303: Assert.assertNotNull(rawWrite);
304: Assert.eval(now >= 0);
305:
306: gc(now);
307:
308: Write write = (Write) rawWrite;
309: Assert
310: .eval(
311: "You can't commit a write before you started it, buddy.",
312: createTiming(now).isAfterOrEqual(
313: write.startedAt()));
314: write.commit(createTiming(now));
315: }
316:
317: public synchronized Context startRead() {
318: return startRead(System.currentTimeMillis());
319: }
320:
321: public synchronized Context startRead(long now) {
322: Assert.eval(now >= 0);
323:
324: gc(now);
325:
326: Read out = new Read(createTiming(now));
327: this .currentReads.add(out);
328: return out;
329: }
330:
331: public synchronized void endRead(Context rawRead, Object result) {
332: endRead(rawRead, result, System.currentTimeMillis());
333: }
334:
335: public synchronized void endRead(Context rawRead, Object result,
336: long now) {
337: Assert.assertNotNull(rawRead);
338: Assert.eval(now >= 0);
339:
340: gc(now);
341:
342: Read read = (Read) rawRead;
343:
344: Set withoutTooOld = removeNotInEffectAsOfTime(
345: this .currentWrites, createTiming(read.startedAt(),
346: -this .slop));
347: Set potentials = removeTooYoung(withoutTooOld,
348: createTiming(now));
349:
350: // System.err.println("endRead(" + rawRead + ", " + result + ", " + now + "),: source " + this.currentWrites
351: // + ", potentials " + potentials);
352:
353: this .currentReads.remove(read);
354:
355: Iterator iter = potentials.iterator();
356: while (iter.hasNext()) {
357: if (valueEquals(result, ((Write) iter.next()).value()))
358: return;
359: }
360:
361: throw Assert
362: .failure("Your read on object "
363: + this
364: + " was incorrect. You read the value "
365: + result
366: + " at some point between "
367: + read.startedAt()
368: + " and "
369: + now
370: + ", but the only writes that were in effect during that period are "
371: + potentials);
372: }
373:
374: private synchronized Set removeNotInEffectAsOfTime(Set source,
375: Timing effectiveTime) {
376: Set out = new HashSet();
377:
378: // Go find the latest start of all the writes that have committed before our cutoff time.
379: Timing latestStart = null;
380: Iterator sourceIter = source.iterator();
381: while (sourceIter.hasNext()) {
382: Write sourceWrite = (Write) sourceIter.next();
383: if (!sourceWrite.isCommitted()
384: || sourceWrite.committedAt().isAfterOrEqual(
385: effectiveTime))
386: continue;
387: if (latestStart == null
388: || latestStart.isBefore(sourceWrite.startedAt()))
389: latestStart = sourceWrite.startedAt();
390: }
391:
392: // Go filter out those that were committed before that start -- they couldn't possibly be affecting us any more.
393: sourceIter = source.iterator();
394: while (sourceIter.hasNext()) {
395: Write sourceWrite = (Write) sourceIter.next();
396: if (sourceWrite.isCommitted()
397: && sourceWrite.committedAt().isBefore(latestStart))
398: continue;
399: out.add(sourceWrite);
400: }
401:
402: return out;
403: }
404:
405: private synchronized Set removeTooYoung(Set source, Timing notAfter) {
406: Set out = new HashSet();
407:
408: Iterator sourceIter = source.iterator();
409: while (sourceIter.hasNext()) {
410: Write sourceWrite = (Write) sourceIter.next();
411: if (sourceWrite.startedAt().isAfter(notAfter))
412: continue;
413: out.add(sourceWrite);
414: }
415:
416: return out;
417: }
418:
419: private synchronized void gc(long now) {
420: Timing gcTime = createTiming(
421: Math.min(now, earliestReadStart()), -Math.max(
422: this .slop, this .gcSlop));
423: Set newSet = removeNotInEffectAsOfTime(this .currentWrites,
424: gcTime);
425:
426: this .currentWrites.clear();
427: this .currentWrites.addAll(newSet);
428: }
429:
430: private synchronized long earliestReadStart() {
431: long out = Long.MAX_VALUE;
432: Iterator iter = this .currentReads.iterator();
433: while (iter.hasNext()) {
434: Read read = (Read) iter.next();
435: out = Math.min(out, read.startedAt.rawTime());
436: }
437:
438: return out;
439: }
440:
441: private boolean valueEquals(Object one, Object two) {
442: if ((one == null) != (two == null))
443: return false;
444: if (one == null)
445: return true;
446: return one.equals(two);
447: }
448:
449: public String toString() {
450: return new OurStringBuilder(this ,
451: OurStringBuilder.COMPACT_STYLE).append("name",
452: this .name).append("slop", this.slop).toString();
453: }
454:
455: }
|