001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.bio.util;
019:
020: /**
021: * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for accessing the queue
022: * by a single remove thread and multiple add threads.
023: *
024: * A thread is only allowed to be either the remove or
025: * an add thread.
026: *
027: * The lock can either be owned by the remove thread
028: * or by a single add thread.
029: *
030: * If the remove thread tries to get the lock,
031: * but the queue is empty, it will block (poll)
032: * until an add threads adds an entry to the queue and
033: * releases the lock.
034: *
035: * If the remove thread and add threads compete for
036: * the lock and an add thread releases the lock, then
037: * the remove thread will get the lock first.
038: *
039: * The remove thread removes all entries in the queue
040: * at once and proceeses them without further
041: * polling the queue.
042: *
043: * The lock is not reentrant, in the sense, that all
044: * threads must release an owned lock before competing
045: * for the lock again!
046: *
047: * @author Rainer Jung
048: * @author Peter Rossbach
049: * @version 1.1
050: */
051:
052: public class SingleRemoveSynchronizedAddLock {
053:
054: public SingleRemoveSynchronizedAddLock() {
055: }
056:
057: public SingleRemoveSynchronizedAddLock(boolean dataAvailable) {
058: this .dataAvailable = dataAvailable;
059: }
060:
061: /**
062: * Time in milliseconds after which threads
063: * waiting for an add lock are woken up.
064: * This is used as a safety measure in case
065: * thread notification via the unlock methods
066: * has a bug.
067: */
068: private long addWaitTimeout = 10000L;
069:
070: /**
071: * Time in milliseconds after which threads
072: * waiting for a remove lock are woken up.
073: * This is used as a safety measure in case
074: * thread notification via the unlock methods
075: * has a bug.
076: */
077: private long removeWaitTimeout = 30000L;
078:
079: /**
080: * The current remove thread.
081: * It is set to the remove thread polling for entries.
082: * It is reset to null when the remove thread
083: * releases the lock and proceeds processing
084: * the removed entries.
085: */
086: private Thread remover = null;
087:
088: /**
089: * A flag indicating, if an add thread owns the lock.
090: */
091: private boolean addLocked = false;
092:
093: /**
094: * A flag indicating, if the remove thread owns the lock.
095: */
096: private boolean removeLocked = false;
097:
098: /**
099: * A flag indicating, if the remove thread is allowed
100: * to wait for the lock. The flag is set to false, when aborting.
101: */
102: private boolean removeEnabled = true;
103:
104: /**
105: * A flag indicating, if the remover needs polling.
106: * It indicates, if the locked object has data available
107: * to be removed.
108: */
109: private boolean dataAvailable = false;
110:
111: /**
112: * @return Value of addWaitTimeout
113: */
114: public synchronized long getAddWaitTimeout() {
115: return addWaitTimeout;
116: }
117:
118: /**
119: * Set value of addWaitTimeout
120: */
121: public synchronized void setAddWaitTimeout(long timeout) {
122: addWaitTimeout = timeout;
123: }
124:
125: /**
126: * @return Value of removeWaitTimeout
127: */
128: public synchronized long getRemoveWaitTimeout() {
129: return removeWaitTimeout;
130: }
131:
132: /**
133: * Set value of removeWaitTimeout
134: */
135: public synchronized void setRemoveWaitTimeout(long timeout) {
136: removeWaitTimeout = timeout;
137: }
138:
139: /**
140: * Check if the locked object has data available
141: * i.e. the remover can stop poling and get the lock.
142: * @return True iff the lock Object has data available.
143: */
144: public synchronized boolean isDataAvailable() {
145: return dataAvailable;
146: }
147:
148: /**
149: * Check if an add thread owns the lock.
150: * @return True iff an add thread owns the lock.
151: */
152: public synchronized boolean isAddLocked() {
153: return addLocked;
154: }
155:
156: /**
157: * Check if the remove thread owns the lock.
158: * @return True iff the remove thread owns the lock.
159: */
160: public synchronized boolean isRemoveLocked() {
161: return removeLocked;
162: }
163:
164: /**
165: * Check if the remove thread is polling.
166: * @return True iff the remove thread is polling.
167: */
168: public synchronized boolean isRemovePolling() {
169: if (remover != null) {
170: return true;
171: }
172: return false;
173: }
174:
175: /**
176: * Acquires the lock by an add thread and sets the add flag.
177: * If any add thread or the remove thread already acquired the lock
178: * this add thread will block until the lock is released.
179: */
180: public synchronized void lockAdd() {
181: if (addLocked || removeLocked) {
182: do {
183: try {
184: wait(addWaitTimeout);
185: } catch (InterruptedException e) {
186: Thread.currentThread().interrupted();
187: }
188: } while (addLocked || removeLocked);
189: }
190: addLocked = true;
191: }
192:
193: /**
194: * Acquires the lock by the remove thread and sets the remove flag.
195: * If any add thread already acquired the lock or the queue is
196: * empty, the remove thread will block until the lock is released
197: * and the queue is not empty.
198: */
199: public synchronized boolean lockRemove() {
200: removeLocked = false;
201: removeEnabled = true;
202: if ((addLocked || !dataAvailable) && removeEnabled) {
203: remover = Thread.currentThread();
204: do {
205: try {
206: wait(removeWaitTimeout);
207: } catch (InterruptedException e) {
208: Thread.currentThread().interrupted();
209: }
210: } while ((addLocked || !dataAvailable) && removeEnabled);
211: remover = null;
212: }
213: if (removeEnabled) {
214: removeLocked = true;
215: }
216: return removeLocked;
217: }
218:
219: /**
220: * Releases the lock by an add thread and reset the remove flag.
221: * If the reader thread is polling, notify it.
222: */
223: public synchronized void unlockAdd(boolean dataAvailable) {
224: addLocked = false;
225: this .dataAvailable = dataAvailable;
226: if ((remover != null) && (dataAvailable || !removeEnabled)) {
227: remover.interrupt();
228: } else {
229: notifyAll();
230: }
231: }
232:
233: /**
234: * Releases the lock by the remove thread and reset the add flag.
235: * Notify all waiting add threads,
236: * that the lock has been released by the remove thread.
237: */
238: public synchronized void unlockRemove() {
239: removeLocked = false;
240: dataAvailable = false;
241: notifyAll();
242: }
243:
244: /**
245: * Abort any polling remover thread
246: */
247: public synchronized void abortRemove() {
248: removeEnabled = false;
249: if (remover != null) {
250: remover.interrupt();
251: }
252: }
253:
254: }
|