001: // $Id: QueueTest.java,v 1.23 2006/09/14 13:24:33 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import junit.framework.TestCase;
006: import org.jgroups.TimeoutException;
007: import org.jgroups.util.Queue;
008: import org.jgroups.util.QueueClosedException;
009: import org.jgroups.util.Util;
010:
011: import java.util.LinkedList;
012: import java.util.ArrayList;
013:
014: public class QueueTest extends TestCase {
015: private Queue queue = null;
016:
017: public QueueTest(String Name_) {
018: super (Name_);
019: }
020:
021: public void setUp() throws Exception {
022: super .setUp();
023: queue = new Queue();
024: }
025:
026: public void tearDown() throws Exception {
027: super .tearDown();
028: if (queue != null) {
029: queue.reset();
030: }
031: }
032:
033: public void testQueue() {
034: try {
035: queue.add("Q1");
036: queue.add("Q2");
037: queue.add("Q3");
038:
039: assertEquals("Q1", queue.peek());
040: assertEquals("Q1", queue.remove());
041:
042: assertEquals("Q2", queue.peek());
043: assertEquals("Q2", queue.remove());
044:
045: queue.addAtHead("Q4");
046: queue.add("Q5");
047: assertEquals("Q4", queue.peek());
048: assertEquals("Q4", queue.remove());
049:
050: queue.close(true);
051:
052: try {
053: queue.add("Q6");
054: fail("should not get here");
055: } catch (org.jgroups.util.QueueClosedException qc) {
056: assertTrue(true);
057: }
058:
059: int size = queue.size();
060: queue.removeElement("Q5");
061: assertEquals((size - 1), queue.size());
062:
063: assertEquals("Q3", queue.peek());
064: assertEquals("Q3", queue.remove());
065: assertTrue(queue.closed());
066: System.out.println("Everything is ok");
067: } catch (Exception x) {
068: System.out.println(x);
069: fail();
070: }
071: }
072:
073: public void testCloseWithoutFlush() {
074: queue.close(false);
075: try {
076: queue.remove();
077: fail("we should have gotten a QueueClosedException trying to remove an element from a closed queue");
078: } catch (QueueClosedException e) {
079: assertTrue("queue is closed, this is okay", queue.closed());
080: }
081: }
082:
083: public void testCloseWithFlush() {
084: queue.close(true);
085: try {
086: queue.remove();
087: fail("we should have gotten a QueueClosedException trying to remove an element from a closed queue");
088: } catch (QueueClosedException e) {
089: assertTrue("queue is closed, this is okay", queue.closed());
090: }
091: }
092:
093: public void testCloseWithFlush2() throws QueueClosedException {
094: queue.add(new Integer(1));
095: queue.add(new Integer(2));
096: queue.add(new Integer(3));
097: queue.close(true);
098: try {
099: for (int i = 1; i <= 3; i++) {
100: Object obj = queue.remove();
101: assertNotNull(obj);
102: assertEquals(obj, new Integer(i));
103: }
104: queue.remove();
105: fail("we should have gotten a QueueClosedException trying to remove an element from a closed queue");
106: } catch (QueueClosedException e) {
107: assertTrue("queue is closed, this is okay", queue.closed());
108: }
109: }
110:
111: public void testValues() throws QueueClosedException {
112: queue.add(new Integer(1));
113: queue.add(new Integer(3));
114: queue.add(new Integer(99));
115: queue.add(new Integer(8));
116: System.out.println("queue: " + Util.dumpQueue(queue));
117: int size = queue.size();
118: assertEquals(4, size);
119: LinkedList values = queue.values();
120: assertEquals(size, values.size());
121: }
122:
123: public void testLargeInsertion() {
124: String element = "MyElement";
125: long start, stop;
126:
127: try {
128: System.out.println("Inserting 100000 elements");
129: start = System.currentTimeMillis();
130: for (int i = 0; i < 100000; i++)
131: queue.add(element);
132: stop = System.currentTimeMillis();
133: System.out.println("Took " + (stop - start) + " msecs");
134:
135: System.out.println("Removing 100000 elements");
136: start = System.currentTimeMillis();
137: while (queue.size() > 0)
138: queue.remove();
139: stop = System.currentTimeMillis();
140: System.out.println("Took " + (stop - start) + " msecs");
141: } catch (Exception ex) {
142: System.err.println(ex);
143: fail();
144: }
145: }
146:
147: public void testEmptyQueue() {
148: assertNull(queue.getFirst());
149: assertNull(queue.getLast());
150: assertEquals(queue.getFirst(), queue.getLast()); // both are null; they're equal
151: }
152:
153: public void testAddAll() throws QueueClosedException {
154: ArrayList l = new ArrayList();
155: l.add("one");
156: l.add("two");
157: l.add("three");
158: queue.addAll(l);
159: System.out.println("queue is " + queue);
160: assertEquals(3, queue.size());
161: assertEquals("one", queue.remove());
162: assertEquals(2, queue.size());
163: assertEquals("two", queue.remove());
164: assertEquals(1, queue.size());
165: assertEquals("three", queue.remove());
166: assertEquals(0, queue.size());
167: }
168:
169: public void testInsertionAndRemoval() throws Exception {
170: String s1 = "Q1", s2 = "Q2";
171:
172: queue.add(s1);
173: assertTrue(queue.getFirst() != null);
174: assertTrue(queue.getLast() != null);
175: assertEquals(queue.getFirst(), queue.getLast());
176:
177: queue.add(s2);
178: assertTrue(queue.getFirst() != queue.getLast());
179:
180: Object o1 = queue.peek();
181: Object o2 = queue.getFirst();
182:
183: System.out.println("o1=" + o1 + ", o2=" + o2
184: + ", o1.equals(o2)=" + o1.equals(o2));
185:
186: assertEquals(queue.peek(), queue.getFirst());
187: queue.remove();
188:
189: assertEquals(1, queue.size());
190: assertEquals(queue.getFirst(), queue.getLast());
191: queue.remove();
192:
193: assertEquals(0, queue.size());
194: assertTrue(queue.getFirst() == null);
195: assertTrue(queue.getLast() == null);
196: }
197:
198: public void testWaitUntilClosed() {
199: queue.close(true);
200: queue.waitUntilClosed(0);
201: assertEquals(0, queue.size());
202: }
203:
204: public void testWaitUntilClosed2() {
205: queue.close(true);
206: try {
207: queue.peek();
208: fail("peek() should throw a QueueClosedException");
209: } catch (QueueClosedException e) {
210: assertTrue(e != null);
211: }
212: assertEquals(0, queue.size());
213: }
214:
215: public void testWaitUntilClosed3() throws QueueClosedException {
216: queue.add("one");
217: queue.close(true);
218: Object obj = queue.peek();
219: assertEquals("one", obj);
220: assertEquals(1, queue.size());
221: queue.remove();
222: try {
223: queue.peek();
224: fail("peek() should throw a QueueClosedException");
225: } catch (QueueClosedException e) {
226: assertTrue(e != null);
227: }
228: assertEquals(0, queue.size());
229: }
230:
231: public void testWaitUntilClosed4() throws QueueClosedException {
232: for (int i = 0; i < 10; i++)
233: queue.add(new Integer(i));
234: new Thread() {
235: public void run() {
236: while (!queue.closed()) {
237: try {
238: System.out.println("-- removed "
239: + queue.remove());
240: Util.sleep(200);
241: } catch (QueueClosedException e) {
242: break;
243: }
244: }
245: }
246: }.start();
247: queue.close(true);
248: queue.waitUntilClosed(0);
249: assertEquals(0, queue.size());
250: }
251:
252: public void testWaitUntilClosed5() throws QueueClosedException {
253: for (int i = 0; i < 10; i++)
254: queue.add(new Integer(i));
255: new Thread() {
256: public void run() {
257: while (!queue.closed()) {
258: try {
259: System.out.println("-- removed "
260: + queue.remove());
261: Util.sleep(200);
262: } catch (QueueClosedException e) {
263: System.out
264: .println("-- queue is closed, cannot remove element");
265: break;
266: }
267: }
268: }
269: }.start();
270:
271: Util.sleep(600);
272: queue.close(false);
273: queue.waitUntilClosed(0);
274: assertTrue(queue.size() > 0);
275: }
276:
277: public void testRemoveElementNoElement() {
278: String s1 = "Q1";
279:
280: try {
281: queue.removeElement(s1);
282: assertFalse(queue.closed());
283: assertEquals(0, queue.size());
284: } catch (QueueClosedException ex) {
285: fail(ex.toString());
286: }
287: }
288:
289: public void testRemoveElementOneElement() {
290: String s1 = "Q1";
291:
292: try {
293: queue.add(s1);
294: queue.removeElement(s1);
295: assertEquals(0, queue.size());
296: assertTrue(queue.getFirst() == null);
297: assertTrue(queue.getLast() == null);
298: } catch (QueueClosedException ex) {
299: fail(ex.toString());
300: }
301: }
302:
303: public void testRemoveElementTwoElementsFirstFound() {
304: String s1 = "Q1", s2 = "Q2";
305:
306: try {
307: queue.add(s1);
308: queue.add(s2);
309: queue.removeElement(s1);
310: assertEquals(1, queue.size());
311: assertEquals(queue.getFirst(), s2);
312: assertEquals(queue.getLast(), s2);
313: assertEquals(queue.getFirst(), queue.getLast());
314: } catch (QueueClosedException ex) {
315: fail(ex.toString());
316: }
317: }
318:
319: public void testRemoveElementTwoElementsSecondFound() {
320: String s1 = "Q1", s2 = "Q2";
321:
322: try {
323: queue.add(s1);
324: queue.add(s2);
325: queue.removeElement(s2);
326: assertEquals(1, queue.size());
327: assertEquals(queue.getFirst(), s1);
328: assertEquals(queue.getLast(), s1);
329: assertEquals(queue.getFirst(), queue.getLast());
330: } catch (QueueClosedException ex) {
331: fail(ex.toString());
332: }
333: }
334:
335: public void testRemoveElementThreeElementsFirstFound() {
336: String s1 = "Q1", s2 = "Q2", s3 = "Q3";
337:
338: try {
339: queue.add(s1);
340: queue.add(s2);
341: queue.add(s3);
342: queue.removeElement(s1);
343: assertEquals(2, queue.size());
344: assertEquals(queue.getFirst(), s2);
345: assertEquals(queue.getLast(), s3);
346: } catch (QueueClosedException ex) {
347: fail(ex.toString());
348: }
349: }
350:
351: public void testRemoveElementThreeElementsSecondFound() {
352: String s1 = "Q1", s2 = "Q2", s3 = "Q3";
353:
354: try {
355: queue.add(s1);
356: queue.add(s2);
357: queue.add(s3);
358: queue.removeElement(s2);
359: assertEquals(2, queue.size());
360: assertEquals(queue.getFirst(), s1);
361: assertEquals(queue.getLast(), s3);
362: } catch (QueueClosedException ex) {
363: fail(ex.toString());
364: }
365: }
366:
367: public void testRemoveElementThreeElementsThirdFound() {
368: String s1 = "Q1", s2 = "Q2", s3 = "Q3";
369:
370: try {
371: queue.add(s1);
372: queue.add(s2);
373: queue.add(s3);
374: queue.removeElement(s3);
375: assertEquals(2, queue.size());
376: assertEquals(queue.getFirst(), s1);
377: assertEquals(queue.getLast(), s2);
378: } catch (QueueClosedException ex) {
379: fail(ex.toString());
380: }
381: }
382:
383: public void testRemoveAndClose() {
384: try {
385: new Thread() {
386: public void run() {
387: Util.sleep(1000);
388: queue.close(true); // close gracefully
389: }
390: }.start();
391:
392: queue.remove();
393: fail("we should not be able to remove an object from a closed queue");
394: } catch (QueueClosedException ex) {
395: assertTrue(ex instanceof QueueClosedException); // of course, stupid comparison...
396: }
397: }
398:
399: public void testRemoveAndCloseWithTimeout() throws TimeoutException {
400: try {
401: new Thread() {
402: public void run() {
403: Util.sleep(1000);
404: queue.close(true); // close gracefully
405: }
406: }.start();
407:
408: queue.remove(5000);
409: fail("we should not be able to remove an object from a closed queue");
410: } catch (QueueClosedException ex) {
411: assertTrue(ex instanceof QueueClosedException); // of course, stupid comparison...
412: } catch (TimeoutException timeout) {
413: fail("we should not get a TimeoutException, but a QueueClosedException here");
414: }
415: }
416:
417: public void testInterruptAndRemove() throws QueueClosedException {
418: Thread.currentThread().interrupt();
419: Object el = null;
420: try {
421: el = queue.remove(2000);
422: fail("we should not get here");
423: } catch (TimeoutException e) {
424: assertNull(el);
425: }
426: }
427:
428: public void testRemoveAndInterrupt() {
429:
430: Thread closer = new Thread() {
431: public void run() {
432: Util.sleep(1000);
433: System.out.println("-- closing queue");
434: queue.close(false);
435: }
436: };
437: closer.start();
438:
439: System.out.println("-- removing element");
440: try {
441: queue.remove();
442: fail("we should not get here, as the queue is closed");
443: } catch (QueueClosedException e) {
444: System.out
445: .println("-- received queue closed exception - as expected");
446: }
447:
448: }
449:
450: public void testClear() throws QueueClosedException {
451: queue.add("one");
452: queue.add("two");
453: assertEquals(2, queue.size());
454: queue.close(true);
455: assertEquals(2, queue.size());
456: queue.clear();
457: assertEquals(0, queue.size());
458: queue = new Queue();
459: queue.add("one");
460: queue.add("two");
461: queue.clear();
462: assertEquals(0, queue.size());
463: queue.add("one");
464: queue.add("two");
465: assertEquals(2, queue.size());
466: queue.clear();
467: assertEquals(0, queue.size());
468: }
469:
470: // public void testWaitUntilEmpty() {
471: // try {
472: // queue.add("one");
473: // queue.add("two");
474: // queue.add("three");
475: //
476: // new Thread() {
477: // public void run() {
478: // try {
479: // sleep(1000);
480: // queue.remove();
481: // queue.remove();
482: // queue.remove();
483: // }
484: // catch(Exception e) {
485: // }
486: // }
487: // }.start();
488: //
489: // queue.waitUntilEmpty(0);
490: // assertEquals(queue.size(), 0);
491: // }
492: // catch(Exception e) {
493: // e.printStackTrace();
494: // fail(e.toString());
495: // }
496: // }
497: //
498: // public void testWaitUntilEmpty2() {
499: // try {
500: // queue.add("one");
501: // queue.add("two");
502: // queue.add("three");
503: //
504: // new Thread() {
505: // public void run() {
506: // try {
507: // sleep(1000);
508: // queue.remove();
509: // queue.remove();
510: // }
511: // catch(Exception e) {
512: // }
513: // }
514: // }.start();
515: //
516: // queue.waitUntilEmpty(3000);
517: // fail("shouldn't get here; we should have caught a TimeoutException");
518: // }
519: // catch(TimeoutException timeout) {
520: // assertTrue(true);
521: // }
522: // catch(Exception e) {
523: // e.printStackTrace();
524: // fail(e.toString());
525: // }
526: // }
527: //
528: //
529: // public void testWaitUntilQueueClosed() {
530: // try {
531: // queue.add("one");
532: // queue.add("two");
533: // queue.add("three");
534: //
535: // new Thread() {
536: // public void run() {
537: // try {
538: // sleep(1000);
539: // queue.close(false);
540: // }
541: // catch(Exception e) {
542: // }
543: // }
544: // }.start();
545: //
546: // queue.waitUntilEmpty(0);
547: // fail("shouldn't get here; we should have caught a QueueClosedException");
548: // }
549: // catch(TimeoutException timeout) {
550: // fail("we should not have gottem here");
551: // }
552: // catch(QueueClosedException ex2) {
553: // assertTrue(true);
554: // }
555: // catch(Exception e) {
556: // e.printStackTrace();
557: // fail();
558: // }
559: // }
560:
561: /** Multiple threads call remove(), one threads then adds an element. Only 1 thread should actually terminate
562: * (the one that has the element) */
563: public void testBarrier() {
564: RemoveOneItem[] removers = new RemoveOneItem[10];
565: int num_dead = 0;
566:
567: for (int i = 0; i < removers.length; i++) {
568: removers[i] = new RemoveOneItem(i);
569: removers[i].start();
570: }
571:
572: Util.sleep(1000);
573:
574: System.out.println("-- adding element 99");
575: try {
576: queue.add(new Long(99));
577: } catch (Exception ex) {
578: System.err.println(ex);
579: }
580:
581: Util.sleep(5000);
582: System.out.println("-- adding element 100");
583: try {
584: queue.add(new Long(100));
585: } catch (Exception ex) {
586: System.err.println(ex);
587: }
588:
589: Util.sleep(1000);
590:
591: for (int i = 0; i < removers.length; i++) {
592: System.out.println("remover #" + i + " is "
593: + (removers[i].isAlive() ? "alive" : "terminated"));
594: if (!removers[i].isAlive()) {
595: num_dead++;
596: }
597: }
598:
599: assertEquals(2, num_dead);
600: }
601:
602: /** Multiple threads call remove(), one threads then adds an element. Only 1 thread should actually terminate
603: * (the one that has the element) */
604: public void testBarrierWithTimeOut() {
605: RemoveOneItemWithTimeout[] removers = new RemoveOneItemWithTimeout[10];
606: int num_dead = 0;
607:
608: for (int i = 0; i < removers.length; i++) {
609: removers[i] = new RemoveOneItemWithTimeout(i, 1000);
610: removers[i].start();
611: }
612:
613: Util.sleep(5000);
614:
615: System.out.println("-- adding element 99");
616: try {
617: queue.add(new Long(99));
618: } catch (Exception ex) {
619: System.err.println(ex);
620: }
621:
622: Util.sleep(5000);
623: System.out.println("-- adding element 100");
624: try {
625: queue.add(new Long(100));
626: } catch (Exception ex) {
627: System.err.println(ex);
628: }
629:
630: Util.sleep(1000);
631:
632: for (int i = 0; i < removers.length; i++) {
633: System.out.println("remover #" + i + " is "
634: + (removers[i].isAlive() ? "alive" : "terminated"));
635: if (!removers[i].isAlive()) {
636: num_dead++;
637: }
638: }
639:
640: assertEquals(2, num_dead);
641:
642: queue.close(false); // will cause all threads still blocking on peek() to return
643:
644: Util.sleep(2000);
645:
646: num_dead = 0;
647: for (int i = 0; i < removers.length; i++) {
648: System.out.println("remover #" + i + " is "
649: + (removers[i].isAlive() ? "alive" : "terminated"));
650: if (!removers[i].isAlive()) {
651: num_dead++;
652: }
653: }
654: assertEquals(10, num_dead);
655:
656: }
657:
658: /** Multiple threads add one element, one thread read them all.
659: * (the one that has the element) */
660: public void testMultipleWriterOneReader() {
661: AddOneItem[] adders = new AddOneItem[10];
662: int num_dead = 0;
663: int num_items = 0;
664: int items = 1000;
665:
666: for (int i = 0; i < adders.length; i++) {
667: adders[i] = new AddOneItem(i, items);
668: adders[i].start();
669: }
670:
671: while (num_items < (adders.length * items)) {
672: try {
673: queue.remove();
674: num_items++;
675: } catch (Exception ex) {
676: System.err.println(ex);
677: }
678: }
679:
680: Util.sleep(1000);
681:
682: for (int i = 0; i < adders.length; i++) {
683: System.out.println("adder #" + i + " is "
684: + (adders[i].isAlive() ? "alive" : "terminated"));
685: if (!adders[i].isAlive()) {
686: num_dead++;
687: }
688: }
689:
690: assertEquals(10, num_dead);
691:
692: queue.close(false); // will cause all threads still blocking on peek() to return
693: }
694:
695: /**
696: * Times how long it takes to add and remove 1000000 elements concurrently (1 reader, 1 writer)
697: */
698: public void testConcurrentAddRemove() {
699: final long NUM = 1000000;
700: long num_received = 0;
701: Object ret;
702: long start, stop;
703:
704: start = System.currentTimeMillis();
705:
706: new Thread() {
707: public void run() {
708: for (int i = 0; i < NUM; i++) {
709: try {
710: queue.add(new Object());
711: } catch (QueueClosedException e) {
712: }
713: }
714: }
715: }.start();
716:
717: while (num_received < NUM) {
718: try {
719: ret = queue.remove();
720: if (ret != null)
721: num_received++;
722: } catch (QueueClosedException e) {
723: e.printStackTrace();
724: fail();
725: }
726: }
727: assertEquals(NUM, num_received);
728: stop = System.currentTimeMillis();
729: System.out.println("time to add/remove " + NUM + " elements: "
730: + (stop - start));
731: }
732:
733: /** Has multiple threads add(), remove() and peek() elements to/from the queue */
734: public void testConcurrentAccess() {
735: final int NUM_THREADS = 10;
736: final int INTERVAL = 20000;
737:
738: Writer[] writers = new Writer[NUM_THREADS];
739: Reader[] readers = new Reader[NUM_THREADS];
740: int[] writes = new int[NUM_THREADS];
741: int[] reads = new int[NUM_THREADS];
742: long total_reads = 0, total_writes = 0;
743:
744: for (int i = 0; i < writers.length; i++) {
745: readers[i] = new Reader(i, reads);
746: readers[i].start();
747: writers[i] = new Writer(i, writes);
748: writers[i].start();
749: }
750:
751: Util.sleep(INTERVAL);
752:
753: System.out.println("current queue size=" + queue.size());
754:
755: for (int i = 0; i < writers.length; i++) {
756: writers[i].stopThread();
757: }
758:
759: for (int i = 0; i < readers.length; i++) {
760: readers[i].stopThread();
761: }
762:
763: queue.close(false); // will cause all threads still blocking on peek() to return
764:
765: System.out.println("current queue size=" + queue.size());
766:
767: for (int i = 0; i < writers.length; i++) {
768: try {
769: writers[i].join(300);
770: readers[i].join(300);
771: } catch (Exception ex) {
772: System.err.println(ex);
773: }
774: }
775:
776: for (int i = 0; i < writes.length; i++) {
777: System.out.println("Thread #" + i + ": " + writes[i]
778: + " writes, " + reads[i] + " reads");
779: total_writes += writes[i];
780: total_reads += reads[i];
781: }
782: System.out.println("total writes=" + total_writes
783: + ", total_reads=" + total_reads + ", diff="
784: + Math.abs(total_writes - total_reads));
785: }
786:
787: class AddOneItem extends Thread {
788: Long retval = null;
789: int rank = 0;
790: int iteration = 0;
791:
792: AddOneItem(int rank, int iteration) {
793: super ("AddOneItem thread #" + rank);
794: this .rank = rank;
795: this .iteration = iteration;
796: setDaemon(true);
797: }
798:
799: public void run() {
800: try {
801: for (int i = 0; i < iteration; i++) {
802: queue.add(new Long(rank));
803: // Util.sleepRandom(1);
804: // System.out.println("Thread #" + rank + " added element (" + rank + ")");
805: }
806: } catch (QueueClosedException closed) {
807: System.err.println("Thread #" + rank
808: + ": queue was closed");
809: }
810: }
811:
812: }
813:
814: class RemoveOneItem extends Thread {
815: Long retval = null;
816: int rank = 0;
817:
818: RemoveOneItem(int rank) {
819: super ("RemoveOneItem thread #" + rank);
820: this .rank = rank;
821: setDaemon(true);
822: }
823:
824: public void run() {
825: try {
826: retval = (Long) queue.remove();
827: // System.out.println("Thread #" + rank + " removed element (" + retval + ")");
828: } catch (QueueClosedException closed) {
829: System.err.println("Thread #" + rank
830: + ": queue was closed");
831: }
832: }
833:
834: Long getRetval() {
835: return retval;
836: }
837: }
838:
839: class RemoveOneItemWithTimeout extends Thread {
840: Long retval = null;
841: int rank = 0;
842: long timeout = 0;
843:
844: RemoveOneItemWithTimeout(int rank, long timeout) {
845: super ("RemoveOneItem thread #" + rank);
846: this .rank = rank;
847: this .timeout = timeout;
848: setDaemon(true);
849: }
850:
851: public void run() {
852: boolean finished = false;
853: while (!finished) {
854: try {
855: retval = (Long) queue.remove(timeout);
856: // System.out.println("Thread #" + rank + " removed element (" + retval + ")");
857: finished = true;
858: } catch (QueueClosedException closed) {
859: System.err.println("Thread #" + rank
860: + ": queue was closed");
861: finished = true;
862: } catch (TimeoutException e) {
863: }
864: }
865: }
866:
867: Long getRetval() {
868: return retval;
869: }
870: }
871:
872: class Writer extends Thread {
873: int rank = 0;
874: int num_writes = 0;
875: boolean running = true;
876: int[] writes = null;
877:
878: Writer(int i, int[] writes) {
879: super ("WriterThread");
880: rank = i;
881: this .writes = writes;
882: setDaemon(true);
883: }
884:
885: public void run() {
886: while (running) {
887: try {
888: queue.add(new Long(System.currentTimeMillis()));
889: num_writes++;
890: } catch (QueueClosedException closed) {
891: running = false;
892: } catch (Throwable t) {
893: System.err
894: .println("QueueTest.Writer.run(): exception="
895: + t);
896: }
897: }
898: writes[rank] = num_writes;
899: }
900:
901: void stopThread() {
902: running = false;
903: }
904: }
905:
906: class Reader extends Thread {
907: int rank;
908: int num_reads = 0;
909: int[] reads = null;
910: boolean running = true;
911:
912: Reader(int i, int[] reads) {
913: super ("ReaderThread");
914: rank = i;
915: this .reads = reads;
916: setDaemon(true);
917: }
918:
919: public void run() {
920: Long el;
921:
922: while (running) {
923: try {
924: el = (Long) queue.remove();
925: if (el == null) { // @remove
926: System.out
927: .println("QueueTest.Reader.run(): peek() returned null element. "
928: + "queue.size()="
929: + queue.size()
930: + ", queue.closed()="
931: + queue.closed());
932: }
933: assertNotNull(el);
934: num_reads++;
935: } catch (QueueClosedException closed) {
936: running = false;
937: } catch (Throwable t) {
938: System.err
939: .println("QueueTest.Reader.run(): exception="
940: + t);
941: }
942: }
943: reads[rank] = num_reads;
944: }
945:
946: void stopThread() {
947: running = false;
948: }
949:
950: }
951:
952: public static void main(String[] args) {
953: String[] testCaseName = { QueueTest.class.getName() };
954: junit.textui.TestRunner.main(testCaseName);
955: }
956:
957: }
|