001: /*
002: * Created on 01-Sep-2005
003: *
004: */
005: package com.jofti.cache;
006:
007: import com.jofti.oswego.concurrent.Sync;
008:
009: /**
010:
011: * This is an adaptation of Doug Lea's ReaderWriter lock to allow effectively two groups of readers.
012: * Any number of readers in one of the groups can be share one lock, but only one group of readers
013: * can be active at once. The difference is that the writer lock (which was a single entry) now behaves like a read lock
014: * but without the single access restriction.
015: *
016: * @author xenephon (xenephon@jofti.com)
017: *
018: */
019: public class QueryUpdateLock {
020:
021: protected class WriterLock extends Signaller implements Sync {
022:
023: public void acquire() throws InterruptedException {
024: if (Thread.interrupted())
025: throw new InterruptedException();
026: InterruptedException interruptedexception = null;
027: synchronized (this ) {
028: if (!startWriteFromNewWriter())
029: try {
030: do
031: wait();
032: while (!startWriteFromWaitingWriter());
033: return;
034: } catch (InterruptedException interruptedexception1) {
035: cancelledWaitingWriter();
036: interruptedexception = interruptedexception1;
037: }
038: }
039: if (interruptedexception != null) {
040: readerLock_.signalWaiters();
041: throw interruptedexception;
042: } else {
043: return;
044: }
045: }
046:
047: public void release() {
048: Signaller signaller = endWrite();
049: if (signaller != null)
050: signaller.signalWaiters();
051: }
052:
053: synchronized void signalWaiters() {
054: notify();
055: }
056:
057: public boolean attempt(long l) throws InterruptedException {
058: if (Thread.interrupted())
059: throw new InterruptedException();
060: InterruptedException interruptedexception = null;
061: synchronized (this ) {
062: if (l <= 0L) {
063: boolean flag = startWrite();
064: return flag;
065: }
066: if (startWriteFromNewWriter()) {
067: boolean flag1 = true;
068: return flag1;
069: }
070: long l1 = l;
071: long l2 = System.currentTimeMillis();
072: do {
073: try {
074: wait(l1);
075: } catch (InterruptedException interruptedexception1) {
076: cancelledWaitingWriter();
077: notify();
078: interruptedexception = interruptedexception1;
079: break;
080: }
081: if (startWriteFromWaitingWriter()) {
082: boolean flag2 = true;
083: return flag2;
084: }
085: l1 = l - (System.currentTimeMillis() - l2);
086: if (l1 > 0L)
087: continue;
088: cancelledWaitingWriter();
089: notify();
090: break;
091: } while (true);
092: }
093: readerLock_.signalWaiters();
094: if (interruptedexception != null)
095: throw interruptedexception;
096: else
097: return false;
098: }
099:
100: protected WriterLock() {
101: }
102: }
103:
104: protected class ReaderLock extends Signaller implements Sync {
105:
106: public void acquire() throws InterruptedException {
107: if (Thread.interrupted())
108: throw new InterruptedException();
109: InterruptedException interruptedexception = null;
110: synchronized (this ) {
111: if (!startReadFromNewReader())
112: try {
113: do
114: wait();
115: while (!startReadFromWaitingReader());
116: return;
117: } catch (InterruptedException interruptedexception1) {
118: cancelledWaitingReader();
119: interruptedexception = interruptedexception1;
120: }
121: }
122: if (interruptedexception != null) {
123: writerLock_.signalWaiters();
124: throw interruptedexception;
125: } else {
126: return;
127: }
128: }
129:
130: public void release() {
131: Signaller signaller = endRead();
132: if (signaller != null)
133: signaller.signalWaiters();
134: }
135:
136: synchronized void signalWaiters() {
137: notifyAll();
138: }
139:
140: public boolean attempt(long l) throws InterruptedException {
141: if (Thread.interrupted())
142: throw new InterruptedException();
143: InterruptedException interruptedexception = null;
144: synchronized (this ) {
145: if (l <= 0L) {
146: boolean flag = startRead();
147: return flag;
148: }
149: if (startReadFromNewReader()) {
150: boolean flag1 = true;
151: return flag1;
152: }
153: long l1 = l;
154: long l2 = System.currentTimeMillis();
155: do {
156: try {
157: wait(l1);
158: } catch (InterruptedException interruptedexception1) {
159: cancelledWaitingReader();
160: interruptedexception = interruptedexception1;
161: break;
162: }
163: if (startReadFromWaitingReader()) {
164: boolean flag2 = true;
165: return flag2;
166: }
167: l1 = l - (System.currentTimeMillis() - l2);
168: if (l1 > 0L)
169: continue;
170: cancelledWaitingReader();
171: break;
172: } while (true);
173: }
174: writerLock_.signalWaiters();
175: if (interruptedexception != null)
176: throw interruptedexception;
177: else
178: return false;
179: }
180:
181: protected ReaderLock() {
182: }
183: }
184:
185: protected abstract class Signaller {
186:
187: abstract void signalWaiters();
188:
189: protected Signaller() {
190: }
191: }
192:
193: public QueryUpdateLock() {
194: activeReaders_ = 0L;
195: activeWriters_ = 0l;
196: waitingReaders_ = 0L;
197: waitingWriters_ = 0L;
198: }
199:
200: public Sync queryLock() {
201: return writerLock_;
202: }
203:
204: public Sync updateLock() {
205: return readerLock_;
206: }
207:
208: protected synchronized void cancelledWaitingReader() {
209: waitingReaders_--;
210: }
211:
212: protected synchronized void cancelledWaitingWriter() {
213: waitingWriters_--;
214: }
215:
216: protected boolean allowReader() {
217: return activeWriters_ <= 0l;
218: }
219:
220: protected boolean allowWriters() {
221: return activeReaders_ <= 0l;
222: }
223:
224: protected synchronized boolean startRead() {
225: boolean flag = allowReader();
226: if (flag) {
227:
228: activeReaders_++;
229: }
230: return flag;
231: }
232:
233: protected synchronized boolean startWrite() {
234: boolean flag = allowWriters();
235: if (flag) {
236:
237: activeWriters_++;
238: }
239: return flag;
240: }
241:
242: protected synchronized boolean startReadFromNewReader() {
243: boolean flag = startRead();
244: if (!flag) {
245: waitingReaders_++;
246: }
247:
248: return flag;
249: }
250:
251: protected synchronized boolean startWriteFromNewWriter() {
252: boolean flag = startWrite();
253: if (!flag)
254: waitingWriters_++;
255: return flag;
256: }
257:
258: protected synchronized boolean startReadFromWaitingReader() {
259: boolean flag = startRead();
260: if (flag)
261: waitingReaders_--;
262:
263: return flag;
264: }
265:
266: protected synchronized boolean startWriteFromWaitingWriter() {
267: boolean flag = startWrite();
268: if (flag) {
269: waitingWriters_--;
270: }
271: return flag;
272: }
273:
274: protected synchronized Signaller endRead() {
275: --activeReaders_;
276:
277: // these are the changes so the readers and writers form two groups
278: if (waitingWriters_ > 0L) {
279:
280: return writerLock_;
281: }
282: if (waitingReaders_ > 0L) {
283:
284: return readerLock_;
285: } else
286: return null;
287: }
288:
289: protected synchronized Signaller endWrite() {
290: --activeWriters_;
291:
292: if (waitingReaders_ > 0L) {
293: return readerLock_;
294: }
295: if (waitingWriters_ > 0L) {
296: return writerLock_;
297: } else
298: return null;
299: }
300:
301: protected long activeReaders_;
302: protected long activeWriters_;
303: protected long waitingReaders_;
304: protected long waitingWriters_;
305: protected final ReaderLock readerLock_ = new ReaderLock();
306: protected final WriterLock writerLock_ = new WriterLock();
307: }
|