001: package org.objectweb.celtix.bus.workqueue;
002:
003: import java.util.concurrent.RejectedExecutionException;
004: import java.util.concurrent.locks.Condition;
005: import java.util.concurrent.locks.Lock;
006: import java.util.concurrent.locks.ReentrantLock;
007:
008: import junit.framework.TestCase;
009:
010: public class AutomaticWorkQueueTest extends TestCase {
011:
012: public static final int UNBOUNDED_MAX_QUEUE_SIZE = -1;
013: public static final int UNBOUNDED_HIGH_WATER_MARK = -1;
014: public static final int UNBOUNDED_LOW_WATER_MARK = -1;
015:
016: public static final int INITIAL_SIZE = 2;
017: public static final int DEFAULT_MAX_QUEUE_SIZE = 10;
018: public static final int DEFAULT_HIGH_WATER_MARK = 10;
019: public static final int DEFAULT_LOW_WATER_MARK = 1;
020: public static final long DEFAULT_DEQUEUE_TIMEOUT = 2 * 60 * 1000L;
021:
022: public static final int TIMEOUT = 100;
023:
024: AutomaticWorkQueueImpl workqueue;
025:
026: public void tearDown() throws Exception {
027: if (workqueue != null) {
028: workqueue.shutdown(true);
029: workqueue = null;
030: }
031: }
032:
033: public void testUnboundedConstructor() {
034: workqueue = new AutomaticWorkQueueImpl(
035: UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
036: UNBOUNDED_HIGH_WATER_MARK, UNBOUNDED_LOW_WATER_MARK,
037: DEFAULT_DEQUEUE_TIMEOUT);
038: assertNotNull(workqueue);
039: assertEquals(AutomaticWorkQueueImpl.DEFAULT_MAX_QUEUE_SIZE,
040: workqueue.getMaxSize());
041: assertEquals(UNBOUNDED_HIGH_WATER_MARK, workqueue
042: .getHighWaterMark());
043: assertEquals(UNBOUNDED_LOW_WATER_MARK, workqueue
044: .getLowWaterMark());
045: }
046:
047: public void testConstructor() {
048: workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE,
049: INITIAL_SIZE, DEFAULT_HIGH_WATER_MARK,
050: DEFAULT_LOW_WATER_MARK, DEFAULT_DEQUEUE_TIMEOUT);
051: assertNotNull(workqueue);
052: assertEquals(DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize());
053: assertEquals(DEFAULT_HIGH_WATER_MARK, workqueue
054: .getHighWaterMark());
055: assertEquals(DEFAULT_LOW_WATER_MARK, workqueue
056: .getLowWaterMark());
057: }
058:
059: public void testEnqueue() {
060: workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE,
061: INITIAL_SIZE, DEFAULT_HIGH_WATER_MARK,
062: DEFAULT_LOW_WATER_MARK, DEFAULT_DEQUEUE_TIMEOUT);
063:
064: try {
065: Thread.sleep(100);
066: } catch (Exception e) {
067: // ignore
068: }
069:
070: // We haven't enqueued anything yet, so should be zero
071: assertEquals(0, workqueue.getSize());
072: assertEquals(INITIAL_SIZE, workqueue.getPoolSize());
073:
074: // Check that no threads are working yet, as we haven't enqueued
075: // anything yet.
076: assertEquals(0, workqueue.getActiveCount());
077:
078: workqueue.execute(new TestWorkItem(), TIMEOUT);
079:
080: // Give threads a chance to dequeue (5sec max)
081: int i = 0;
082: while (workqueue.getSize() != 0 && i++ < 50) {
083: try {
084: Thread.sleep(100);
085: } catch (InterruptedException ie) {
086: // ignore
087: }
088: }
089: assertEquals(0, workqueue.getSize());
090: }
091:
092: public void testEnqueueImmediate() {
093: workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE,
094: INITIAL_SIZE, DEFAULT_HIGH_WATER_MARK,
095: DEFAULT_LOW_WATER_MARK, DEFAULT_DEQUEUE_TIMEOUT);
096:
097: try {
098: Thread.sleep(100);
099: } catch (Exception e) {
100: // ignore
101: }
102:
103: // We haven't enqueued anything yet, so should there shouldn't be
104: // any items on the queue, the thread pool should still be the
105: // initial size and no threads should be working
106: //
107: assertEquals(0, workqueue.getSize());
108: assertEquals(INITIAL_SIZE, workqueue.getPoolSize());
109: assertEquals(0, workqueue.getActiveCount());
110:
111: BlockingWorkItem[] workItems = new BlockingWorkItem[DEFAULT_HIGH_WATER_MARK];
112: BlockingWorkItem[] fillers = new BlockingWorkItem[DEFAULT_MAX_QUEUE_SIZE];
113:
114: try {
115: // fill up the queue, then exhaust the thread pool
116: //
117: for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) {
118: workItems[i] = new BlockingWorkItem();
119: try {
120: workqueue.execute(workItems[i]);
121: } catch (RejectedExecutionException ex) {
122: fail("failed on item[" + i + "] with: " + ex);
123: }
124: }
125:
126: while (workqueue.getActiveCount() < INITIAL_SIZE) {
127: try {
128: Thread.sleep(250);
129: } catch (InterruptedException ex) {
130: // ignore
131: }
132: }
133:
134: for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) {
135: fillers[i] = new BlockingWorkItem();
136: try {
137: workqueue.execute(fillers[i]);
138: } catch (RejectedExecutionException ex) {
139: fail("failed on filler[" + i + "] with: " + ex);
140: }
141: }
142:
143: // give threads a chance to start executing the work items
144: try {
145: Thread.sleep(250);
146: } catch (InterruptedException ex) {
147: // ignore
148: }
149:
150: assertTrue(workqueue.toString(), workqueue.isFull());
151: assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK,
152: workqueue.getPoolSize());
153: assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK,
154: workqueue.getActiveCount());
155:
156: try {
157: workqueue.execute(new BlockingWorkItem());
158: fail("workitem should not have been accepted.");
159: } catch (RejectedExecutionException ex) {
160: // ignore
161: }
162:
163: // unblock one work item and allow thread to dequeue next item
164:
165: workItems[0].unblock();
166: boolean accepted = false;
167: workItems[0] = new BlockingWorkItem();
168:
169: for (int i = 0; i < 20 && !accepted; i++) {
170: try {
171: Thread.sleep(100);
172: } catch (InterruptedException ex) {
173: // ignore
174: }
175: try {
176: workqueue.execute(workItems[0]);
177: accepted = true;
178: } catch (RejectedExecutionException ex) {
179: // ignore
180: }
181: }
182: assertTrue(accepted);
183: } finally {
184: for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) {
185: if (workItems[i] != null) {
186: workItems[i].unblock();
187: }
188: }
189: for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) {
190: if (fillers[i] != null) {
191: fillers[i].unblock();
192: }
193: }
194: }
195: }
196:
197: public void testDeadLockEnqueueLoads() {
198: workqueue = new AutomaticWorkQueueImpl(500, 1, 2, 2,
199: DEFAULT_DEQUEUE_TIMEOUT);
200: DeadLockThread dead = new DeadLockThread(workqueue, 200, 10L);
201:
202: assertTrue(checkDeadLock(dead));
203: }
204:
205: public void testNonDeadLockEnqueueLoads() {
206: workqueue = new AutomaticWorkQueueImpl(
207: UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
208: UNBOUNDED_HIGH_WATER_MARK, UNBOUNDED_LOW_WATER_MARK,
209: DEFAULT_DEQUEUE_TIMEOUT);
210: DeadLockThread dead = new DeadLockThread(workqueue, 200);
211:
212: assertTrue(checkDeadLock(dead));
213: }
214:
215: public void testSchedule() throws Exception {
216: workqueue = new AutomaticWorkQueueImpl(
217: UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
218: UNBOUNDED_HIGH_WATER_MARK, UNBOUNDED_LOW_WATER_MARK,
219: DEFAULT_DEQUEUE_TIMEOUT);
220: final Lock runLock = new ReentrantLock();
221: final Condition runCondition = runLock.newCondition();
222: long start = System.currentTimeMillis();
223: Runnable doNothing = new Runnable() {
224: public void run() {
225: runLock.lock();
226: try {
227: runCondition.signal();
228: } finally {
229: runLock.unlock();
230: }
231: }
232: };
233:
234: workqueue.schedule(doNothing, 5000);
235:
236: runLock.lock();
237: try {
238: runCondition.await();
239: } finally {
240: runLock.unlock();
241: }
242:
243: assertTrue("expected delay",
244: System.currentTimeMillis() - start >= 4950);
245: }
246:
247: public void testThreadPoolShrink() {
248: workqueue = new AutomaticWorkQueueImpl(
249: UNBOUNDED_MAX_QUEUE_SIZE, 20, 20, 10, 100L);
250:
251: DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L);
252:
253: assertTrue("Should be finished, probably deadlocked",
254: checkDeadLock(dead));
255:
256: // Give threads a chance to dequeue (5sec max)
257: int i = 0;
258: while (workqueue.getPoolSize() != 10 && i++ < 50) {
259: try {
260: Thread.sleep(100);
261: } catch (InterruptedException ie) {
262: // ignore
263: }
264: }
265: assertEquals(workqueue.getLowWaterMark(), workqueue
266: .getPoolSize());
267: }
268:
269: public void testThreadPoolShrinkUnbounded() {
270: workqueue = new AutomaticWorkQueueImpl(
271: UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
272: UNBOUNDED_HIGH_WATER_MARK, DEFAULT_LOW_WATER_MARK, 100L);
273:
274: DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L);
275: assertTrue("Should be finished, probably deadlocked",
276: checkDeadLock(dead));
277:
278: // Give threads a chance to dequeue (5sec max)
279: int i = 0;
280: int last = workqueue.getPoolSize();
281: while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK
282: && i++ < 50) {
283: if (last != workqueue.getPoolSize()) {
284: last = workqueue.getPoolSize();
285: i = 0;
286: }
287: try {
288: Thread.sleep(100);
289: } catch (InterruptedException ie) {
290: // ignore
291: }
292: }
293: assertTrue("threads_total()",
294: workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
295: }
296:
297: public void testShutdown() {
298: workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE,
299: INITIAL_SIZE, INITIAL_SIZE, INITIAL_SIZE, 250);
300:
301: assertEquals(0, workqueue.getSize());
302: DeadLockThread dead = new DeadLockThread(workqueue, 100, 5L);
303: dead.start();
304: assertTrue(checkCompleted(dead));
305:
306: workqueue.shutdown(true);
307:
308: // Give threads a chance to shutdown (1 sec max)
309: for (int i = 0; i < 20
310: && (workqueue.getSize() > 0 || workqueue.getPoolSize() > 0); i++) {
311: try {
312: Thread.sleep(250);
313: } catch (InterruptedException ie) {
314: // ignore
315: }
316: }
317: assertEquals(0, workqueue.getSize());
318: assertEquals(0, workqueue.getPoolSize());
319:
320: //already shutdown
321: workqueue = null;
322: }
323:
324: private boolean checkCompleted(DeadLockThread dead) {
325: int oldCompleted = 0;
326: int newCompleted = 0;
327: int noProgressCount = 0;
328: while (!dead.isFinished()) {
329: newCompleted = dead.getWorkItemCompletedCount();
330: if (newCompleted > oldCompleted) {
331: oldCompleted = newCompleted;
332: noProgressCount = 0;
333: } else {
334: // No reduction in the completion count so it may be deadlocked,
335: // allow thread to make no progress for 5 time-slices before
336: // assuming a deadlock has occurred
337: //
338: if (oldCompleted != 0 && ++noProgressCount > 5) {
339: return false;
340: }
341: }
342: try {
343: Thread.sleep(250);
344: } catch (InterruptedException ie) {
345: // ignore
346: }
347: }
348: return true;
349: }
350:
351: private boolean checkDeadLock(DeadLockThread dead) {
352: dead.start();
353: return checkCompleted(dead);
354: }
355:
356: public class TestWorkItem implements Runnable {
357: String name;
358: long worktime;
359: Callback callback;
360:
361: public TestWorkItem() {
362: this ("WI");
363: }
364:
365: public TestWorkItem(String n) {
366: this (n, DeadLockThread.DEFAULT_WORK_TIME);
367: }
368:
369: public TestWorkItem(String n, long wt) {
370: this (n, wt, null);
371: }
372:
373: public TestWorkItem(String n, long wt, Callback c) {
374: name = n;
375: worktime = wt;
376: callback = c;
377: }
378:
379: public void run() {
380: try {
381: try {
382: Thread.sleep(worktime);
383: } catch (InterruptedException ie) {
384: // ignore
385: return;
386: }
387: } finally {
388: if (callback != null) {
389: callback.workItemCompleted(name);
390: }
391: }
392: }
393:
394: public String toString() {
395: return "[TestWorkItem:name=" + name + "]";
396: }
397: }
398:
399: public class BlockingWorkItem implements Runnable {
400: private boolean unblocked;
401:
402: public void run() {
403: synchronized (this ) {
404: while (!unblocked) {
405: try {
406: wait();
407: } catch (InterruptedException ie) {
408: // ignore
409: }
410: }
411: }
412: }
413:
414: void unblock() {
415: synchronized (this ) {
416: unblocked = true;
417: notify();
418: }
419: }
420: }
421:
422: public interface Callback {
423: void workItemCompleted(String name);
424: }
425:
426: public class DeadLockThread extends Thread implements Callback {
427: public static final long DEFAULT_WORK_TIME = 10L;
428: public static final int DEFAULT_WORK_ITEMS = 200;
429:
430: AutomaticWorkQueueImpl workqueue;
431: int nWorkItems;
432: int nWorkItemsCompleted;
433: long worktime;
434: long finishTime;
435: long startTime;
436:
437: public DeadLockThread(AutomaticWorkQueueImpl wq) {
438: this (wq, DEFAULT_WORK_ITEMS, DEFAULT_WORK_TIME);
439: }
440:
441: public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi) {
442: this (wq, nwi, DEFAULT_WORK_TIME);
443: }
444:
445: public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi,
446: long wt) {
447: workqueue = wq;
448: nWorkItems = nwi;
449: worktime = wt;
450: }
451:
452: public synchronized boolean isFinished() {
453: return nWorkItemsCompleted == nWorkItems;
454: }
455:
456: public synchronized void workItemCompleted(String name) {
457: nWorkItemsCompleted++;
458: if (isFinished()) {
459: finishTime = System.currentTimeMillis();
460: }
461: }
462:
463: public int getWorkItemCount() {
464: return nWorkItems;
465: }
466:
467: public long worktime() {
468: return worktime;
469: }
470:
471: public synchronized int getWorkItemCompletedCount() {
472: return nWorkItemsCompleted;
473: }
474:
475: public long finishTime() {
476: return finishTime;
477: }
478:
479: public long duration() {
480: return finishTime - startTime;
481: }
482:
483: public void run() {
484: startTime = System.currentTimeMillis();
485:
486: for (int i = 0; i < nWorkItems; i++) {
487: try {
488: workqueue.execute(new TestWorkItem(String
489: .valueOf(i), worktime, this ), TIMEOUT);
490: } catch (RejectedExecutionException ex) {
491: // ignore
492: }
493: }
494: while (!isFinished()) {
495: try {
496: Thread.sleep(worktime);
497: } catch (InterruptedException ie) {
498: // ignore
499: }
500: }
501: }
502: }
503:
504: }
|