001: /* ====================================================================
002: The Jicarilla Software License
003:
004: Copyright (c) 2003 Leo Simons.
005: All rights reserved.
006:
007: Permission is hereby granted, free of charge, to any person obtaining
008: a copy of this software and associated documentation files (the
009: "Software"), to deal in the Software without restriction, including
010: without limitation the rights to use, copy, modify, merge, publish,
011: distribute, sublicense, and/or sell copies of the Software, and to
012: permit persons to whom the Software is furnished to do so, subject to
013: the following conditions:
014:
015: The above copyright notice and this permission notice shall be
016: included in all copies or substantial portions of the Software.
017:
018: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
019: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
020: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
021: IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
022: CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
023: TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
024: SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
025: ==================================================================== */
026: package org.jicarilla.plumbing.test;
027:
028: import EDU.oswego.cs.dl.util.concurrent.Channel;
029: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
030: import org.jicarilla.plumbing.PostProcessor;
031: import org.jicarilla.plumbing.SimpleSink;
032: import org.jicarilla.plumbing.SimpleStage;
033: import org.jicarilla.plumbing.Sink;
034: import org.jicarilla.plumbing.Sink;
035: import org.jicarilla.plumbing.SimpleStage;
036: import junit.framework.TestCase;
037:
038: import java.util.List;
039:
040: /**
041: * <a href="http://www.junit.org/">JUnit</a> {@link TestCase testcase} for
042: * PostProcessor.
043: *
044: * @author <a href="lsimons at jicarilla dot org">Leo Simons</a>
045: * @version $Id: PostProcessorTestCase.java,v 1.1 2003/11/17 21:11:10 lsimons
046: * Exp $
047: */
048: public class PostProcessorTestCase extends TestCase {
049: public final static class Processor extends PostProcessor {
050: public int processed = 0;
051:
052: public Processor(Channel channel, Sink errorHandler) {
053: super (channel, errorHandler);
054: }
055:
056: public List getStages() {
057: return m_stages;
058: }
059:
060: public void process(Object o) throws InterruptedException {
061: super .process(o);
062: processed++;
063: }
064: }
065:
066: LinkedQueue c;
067: LinkedQueue l;
068: SimpleSink e;
069:
070: public void setUp() {
071: c = new LinkedQueue();
072: ;
073: l = new LinkedQueue();
074: e = new SimpleSink(l);
075: }
076:
077: public void testConstructor() {
078: new PostProcessor(c, e);
079: }
080:
081: public void testAdd() {
082: Processor p = new Processor(c, e);
083: SimpleStage s = new SimpleStage(new LinkedQueue());
084: p.addStage(s);
085:
086: assertEquals(s, p.getStages().get(0));
087:
088: SimpleStage s1 = new SimpleStage(new LinkedQueue());
089: p.addStage(s1);
090: SimpleStage s2 = new SimpleStage(new LinkedQueue());
091: p.addStage(s2);
092: SimpleStage s3 = new SimpleStage(new LinkedQueue());
093: p.addStage(s3);
094: SimpleStage s4 = new SimpleStage(new LinkedQueue());
095: p.addStage(s4);
096:
097: assertEquals(s1, p.getStages().get(1));
098: assertEquals(s2, p.getStages().get(2));
099: assertEquals(s3, p.getStages().get(3));
100: assertEquals(s4, p.getStages().get(4));
101:
102: Throwable t = null;
103: try {
104: p.addStage(null);
105: } catch (AssertionError ae) {
106: t = ae;
107: }
108: assertNotNull(t);
109: }
110:
111: boolean running = true;
112:
113: public void testConcurrentAdd() throws InterruptedException {
114: final Processor p = new Processor(c, e);
115:
116: Thread[] threads = new Thread[5];
117: for (int i = 0; i < 5; i++) {
118: threads[i] = new Thread(new Runnable() {
119:
120: public void run() {
121: while (running) {
122: try {
123: Thread.sleep(100);
124: } catch (InterruptedException e1) {
125: break;
126: }
127: SimpleStage s = new SimpleStage(
128: new LinkedQueue());
129: p.addStage(s);
130: Thread.yield();
131: }
132: }
133: });
134: }
135: for (int i = 0; i < 5; i++) {
136: threads[i].start();
137: }
138:
139: Thread.sleep(1000);
140: running = false;
141:
142: for (int i = 0; i < 5; i++) {
143: threads[i].interrupt();
144: }
145: }
146:
147: public void testPut() throws InterruptedException {
148: final Processor p = new Processor(c, e);
149:
150: p.put(new Object());
151: p.put(new Object());
152: p.put(new Object());
153: p.put(new Object());
154:
155: assertEquals(0, p.processed);
156: }
157:
158: public void testOffer() throws InterruptedException {
159: final Processor p = new Processor(c, e);
160:
161: p.offer(new Object(), 100);
162: p.offer(new Object(), 100);
163: p.offer(new Object(), 100);
164: p.offer(new Object(), 100);
165:
166: assertEquals(0, p.processed);
167: }
168:
169: public void testTake() throws InterruptedException {
170: final Processor p = new Processor(c, e);
171:
172: p.put(new Object());
173: p.put(new Object());
174: p.put(new Object());
175: p.put(new Object());
176:
177: p.take();
178: p.take();
179: p.take();
180: p.take();
181:
182: assertEquals(4, p.processed);
183: }
184:
185: public void testPoll() throws InterruptedException {
186: final Processor p = new Processor(c, e);
187:
188: p.put(new Object());
189: p.put(new Object());
190: p.put(new Object());
191: p.put(new Object());
192:
193: p.poll(100);
194: p.poll(100);
195: p.poll(100);
196: p.poll(100);
197:
198: assertEquals(4, p.processed);
199: }
200:
201: public final static class CountingStage extends SimpleStage {
202: public int put = 0;
203: public int taken = 0;
204:
205: public static int sput = 0;
206: public static int staken = 0;
207:
208: public CountingStage() {
209: super (new LinkedQueue());
210: }
211:
212: public void put(Object o) throws InterruptedException {
213: super .put(o);
214: put++;
215: sput++;
216: }
217:
218: public Object take() throws InterruptedException {
219: taken++;
220: staken++;
221: return super .take();
222: }
223: }
224:
225: public void testProcess() throws InterruptedException {
226: final Processor p = new Processor(c, e);
227:
228: CountingStage s1 = new CountingStage();
229: p.addStage(s1);
230: CountingStage s2 = new CountingStage();
231: p.addStage(s2);
232: CountingStage s3 = new CountingStage();
233: p.addStage(s3);
234: CountingStage s4 = new CountingStage();
235: p.addStage(s4);
236:
237: p.process(new Object());
238: p.process(new Object());
239: p.process(new Object());
240: p.process(new Object());
241:
242: assertEquals(4, s1.put);
243: assertEquals(4, s1.taken);
244: assertEquals(4, s2.put);
245: assertEquals(4, s2.taken);
246: assertEquals(4, s3.put);
247: assertEquals(4, s3.taken);
248: assertEquals(4, s4.put);
249: assertEquals(4, s4.taken);
250: assertEquals(4 * 4, CountingStage.sput);
251: assertEquals(4 * 4, CountingStage.staken);
252: }
253: }
|