001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.bio.util;
019:
020: import org.apache.catalina.tribes.ChannelMessage;
021: import org.apache.catalina.tribes.Member;
022: import org.apache.catalina.tribes.group.InterceptorPayload;
023:
024: /**
025: * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
026: * length when you have strange producer thread problemes.
027: *
028: * FIXME add i18n support to log messages
029: * @author Rainer Jung
030: * @author Peter Rossbach
031: * @version $Revision: 500684 $ $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $
032: */
033: public class FastQueue {
034:
035: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
036: .getLog(FastQueue.class);
037:
038: /**
039: * This is the actual queue
040: */
041: private SingleRemoveSynchronizedAddLock lock = null;
042:
043: /**
044: * First Object at queue (consumer message)
045: */
046: private LinkObject first = null;
047:
048: /**
049: * Last object in queue (producer Object)
050: */
051: private LinkObject last = null;
052:
053: /**
054: * Current Queue elements size
055: */
056: private int size = 0;
057:
058: /**
059: * check lock to detect strange threadings things
060: */
061: private boolean checkLock = false;
062:
063: /**
064: * protocol the thread wait times
065: */
066: private boolean timeWait = false;
067:
068: private boolean inAdd = false;
069:
070: private boolean inRemove = false;
071:
072: private boolean inMutex = false;
073:
074: /**
075: * limit the queue legnth ( default is unlimited)
076: */
077: private int maxQueueLength = 0;
078:
079: /**
080: * addWaitTimeout for producer
081: */
082: private long addWaitTimeout = 10000L;
083:
084: /**
085: * removeWaitTimeout for consumer
086: */
087: private long removeWaitTimeout = 30000L;
088:
089: /**
090: * enabled the queue
091: */
092: private boolean enabled = true;
093:
094: /**
095: * max queue size
096: */
097: private int maxSize = 0;
098:
099: /**
100: * avg size sample interval
101: */
102: private int sampleInterval = 100;
103:
104: /**
105: * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
106: * Timeouts
107: */
108: public FastQueue() {
109: lock = new SingleRemoveSynchronizedAddLock();
110: lock.setAddWaitTimeout(addWaitTimeout);
111: lock.setRemoveWaitTimeout(removeWaitTimeout);
112: }
113:
114: /**
115: * get current add wait timeout
116: *
117: * @return current wait timeout
118: */
119: public long getAddWaitTimeout() {
120: addWaitTimeout = lock.getAddWaitTimeout();
121: return addWaitTimeout;
122: }
123:
124: /**
125: * Set add wait timeout (default 10000 msec)
126: *
127: * @param timeout
128: */
129: public void setAddWaitTimeout(long timeout) {
130: addWaitTimeout = timeout;
131: lock.setAddWaitTimeout(addWaitTimeout);
132: }
133:
134: /**
135: * get current remove wait timeout
136: *
137: * @return The timeout
138: */
139: public long getRemoveWaitTimeout() {
140: removeWaitTimeout = lock.getRemoveWaitTimeout();
141: return removeWaitTimeout;
142: }
143:
144: /**
145: * set remove wait timeout ( default 30000 msec)
146: *
147: * @param timeout
148: */
149: public void setRemoveWaitTimeout(long timeout) {
150: removeWaitTimeout = timeout;
151: lock.setRemoveWaitTimeout(removeWaitTimeout);
152: }
153:
154: /**
155: * get Max Queue length
156: *
157: * @see org.apache.catalina.tribes.util.IQueue#getMaxQueueLength()
158: */
159: public int getMaxQueueLength() {
160: return maxQueueLength;
161: }
162:
163: public void setMaxQueueLength(int length) {
164: maxQueueLength = length;
165: }
166:
167: public boolean isEnabled() {
168: return enabled;
169: }
170:
171: public void setEnabled(boolean enable) {
172: enabled = enable;
173: if (!enabled) {
174: lock.abortRemove();
175: last = first = null;
176: }
177: }
178:
179: /**
180: * @return Returns the checkLock.
181: */
182: public boolean isCheckLock() {
183: return checkLock;
184: }
185:
186: /**
187: * @param checkLock The checkLock to set.
188: */
189: public void setCheckLock(boolean checkLock) {
190: this .checkLock = checkLock;
191: }
192:
193: /**
194: * @return The max size
195: */
196: public int getMaxSize() {
197: return maxSize;
198: }
199:
200: /**
201: * @param size
202: */
203: public void setMaxSize(int size) {
204: maxSize = size;
205: }
206:
207: /**
208: * unlock queue for next add
209: */
210: public void unlockAdd() {
211: lock.unlockAdd(size > 0 ? true : false);
212: }
213:
214: /**
215: * unlock queue for next remove
216: */
217: public void unlockRemove() {
218: lock.unlockRemove();
219: }
220:
221: /**
222: * start queuing
223: */
224: public void start() {
225: setEnabled(true);
226: }
227:
228: /**
229: * start queuing
230: */
231: public void stop() {
232: setEnabled(false);
233: }
234:
235: public int getSize() {
236: return size;
237: }
238:
239: public SingleRemoveSynchronizedAddLock getLock() {
240: return lock;
241: }
242:
243: /**
244: * Add new data to the queue
245: * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object)
246: * FIXME extract some method
247: */
248: public boolean add(ChannelMessage msg, Member[] destination,
249: InterceptorPayload payload) {
250: boolean ok = true;
251: long time = 0;
252:
253: if (!enabled) {
254: if (log.isInfoEnabled())
255: log.info("FastQueue.add: queue disabled, add aborted");
256: return false;
257: }
258:
259: if (timeWait) {
260: time = System.currentTimeMillis();
261: }
262: lock.lockAdd();
263: try {
264: if (log.isTraceEnabled()) {
265: log.trace("FastQueue.add: starting with size " + size);
266: }
267: if (checkLock) {
268: if (inAdd)
269: log.warn("FastQueue.add: Detected other add");
270: inAdd = true;
271: if (inMutex)
272: log
273: .warn("FastQueue.add: Detected other mutex in add");
274: inMutex = true;
275: }
276:
277: if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
278: ok = false;
279: if (log.isTraceEnabled()) {
280: log
281: .trace("FastQueue.add: Could not add, since queue is full ("
282: + size
283: + ">="
284: + maxQueueLength
285: + ")");
286: }
287: } else {
288: LinkObject element = new LinkObject(msg, destination,
289: payload);
290: if (size == 0) {
291: first = last = element;
292: size = 1;
293: } else {
294: if (last == null) {
295: ok = false;
296: log
297: .error("FastQueue.add: Could not add, since last is null although size is "
298: + size + " (>0)");
299: } else {
300: last.append(element);
301: last = element;
302: size++;
303: }
304: }
305: }
306:
307: if (first == null) {
308: log.error("FastQueue.add: first is null, size is "
309: + size + " at end of add");
310: }
311: if (last == null) {
312: log.error("FastQueue.add: last is null, size is "
313: + size + " at end of add");
314: }
315:
316: if (checkLock) {
317: if (!inMutex)
318: log
319: .warn("FastQueue.add: Cancelled by other mutex in add");
320: inMutex = false;
321: if (!inAdd)
322: log.warn("FastQueue.add: Cancelled by other add");
323: inAdd = false;
324: }
325: if (log.isTraceEnabled())
326: log
327: .trace("FastQueue.add: add ending with size "
328: + size);
329:
330: } finally {
331: lock.unlockAdd(true);
332: }
333: return ok;
334: }
335:
336: /**
337: * remove the complete queued object list
338: * @see org.apache.catalina.tribes.util.IQueue#remove()
339: * FIXME extract some method
340: */
341: public LinkObject remove() {
342: LinkObject element;
343: boolean gotLock;
344: long time = 0;
345:
346: if (!enabled) {
347: if (log.isInfoEnabled())
348: log
349: .info("FastQueue.remove: queue disabled, remove aborted");
350: return null;
351: }
352:
353: if (timeWait) {
354: time = System.currentTimeMillis();
355: }
356: gotLock = lock.lockRemove();
357: try {
358:
359: if (!gotLock) {
360: if (enabled) {
361: if (log.isInfoEnabled())
362: log
363: .info("FastQueue.remove: Remove aborted although queue enabled");
364: } else {
365: if (log.isInfoEnabled())
366: log
367: .info("FastQueue.remove: queue disabled, remove aborted");
368: }
369: return null;
370: }
371:
372: if (log.isTraceEnabled()) {
373: log
374: .trace("FastQueue.remove: remove starting with size "
375: + size);
376: }
377: if (checkLock) {
378: if (inRemove)
379: log.warn("FastQueue.remove: Detected other remove");
380: inRemove = true;
381: if (inMutex)
382: log
383: .warn("FastQueue.remove: Detected other mutex in remove");
384: inMutex = true;
385: }
386:
387: element = first;
388:
389: first = last = null;
390: size = 0;
391:
392: if (checkLock) {
393: if (!inMutex)
394: log
395: .warn("FastQueue.remove: Cancelled by other mutex in remove");
396: inMutex = false;
397: if (!inRemove)
398: log
399: .warn("FastQueue.remove: Cancelled by other remove");
400: inRemove = false;
401: }
402: if (log.isTraceEnabled()) {
403: log.trace("FastQueue.remove: remove ending with size "
404: + size);
405: }
406:
407: if (timeWait) {
408: time = System.currentTimeMillis();
409: }
410: } finally {
411: lock.unlockRemove();
412: }
413: return element;
414: }
415:
416: }
|