001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.commons.io;
018:
019: import java.io.ByteArrayInputStream;
020: import java.io.IOException;
021: import java.io.InputStream;
022: import java.io.OutputStream;
023: import java.util.HashMap;
024: import java.util.Iterator;
025: import java.util.Random;
026: import junit.framework.TestCase;
027:
028: import org.apache.commons.io.output.ByteArrayOutputStream;
029: import org.apache.commons.io.output.DemuxOutputStream;
030: import org.apache.commons.io.input.DemuxInputStream;
031:
032: /**
033: * Basic unit tests for the multiplexing streams.
034: *
035: * @author <a href="mailto:peter@apache.org">Peter Donald</a>
036: */
037: public class DemuxTestCase extends TestCase {
038: private static final String T1 = "Thread1";
039: private static final String T2 = "Thread2";
040: private static final String T3 = "Thread3";
041: private static final String T4 = "Thread4";
042:
043: private static final String DATA1 = "Data for thread1";
044: private static final String DATA2 = "Data for thread2";
045: private static final String DATA3 = "Data for thread3";
046: private static final String DATA4 = "Data for thread4";
047:
048: private static Random c_random = new Random();
049: private HashMap m_outputMap = new HashMap();
050: private HashMap m_threadMap = new HashMap();
051:
052: public DemuxTestCase(String name) {
053: super (name);
054: }
055:
056: private String getOutput(String threadName) throws IOException {
057: ByteArrayOutputStream output = (ByteArrayOutputStream) m_outputMap
058: .get(threadName);
059: assertNotNull("getOutput()", output);
060:
061: return output.toString();
062: }
063:
064: private String getInput(String threadName) throws IOException {
065: ReaderThread thread = (ReaderThread) m_threadMap
066: .get(threadName);
067: assertNotNull("getInput()", thread);
068:
069: return thread.getData();
070: }
071:
072: private void doStart() throws Exception {
073: Iterator iterator = m_threadMap.keySet().iterator();
074: while (iterator.hasNext()) {
075: String name = (String) iterator.next();
076: Thread thread = (Thread) m_threadMap.get(name);
077: thread.start();
078: }
079: }
080:
081: private void doJoin() throws Exception {
082: Iterator iterator = m_threadMap.keySet().iterator();
083: while (iterator.hasNext()) {
084: String name = (String) iterator.next();
085: Thread thread = (Thread) m_threadMap.get(name);
086: thread.join();
087: }
088: }
089:
090: private void startWriter(String name, String data,
091: DemuxOutputStream demux) throws Exception {
092: ByteArrayOutputStream output = new ByteArrayOutputStream();
093: m_outputMap.put(name, output);
094: WriterThread thread = new WriterThread(name, data, output,
095: demux);
096: m_threadMap.put(name, thread);
097: }
098:
099: private void startReader(String name, String data,
100: DemuxInputStream demux) throws Exception {
101: ByteArrayInputStream input = new ByteArrayInputStream(data
102: .getBytes());
103: ReaderThread thread = new ReaderThread(name, input, demux);
104: m_threadMap.put(name, thread);
105: }
106:
107: public void testOutputStream() throws Exception {
108: DemuxOutputStream output = new DemuxOutputStream();
109: startWriter(T1, DATA1, output);
110: startWriter(T2, DATA2, output);
111: startWriter(T3, DATA3, output);
112: startWriter(T4, DATA4, output);
113:
114: doStart();
115: doJoin();
116:
117: assertEquals("Data1", DATA1, getOutput(T1));
118: assertEquals("Data2", DATA2, getOutput(T2));
119: assertEquals("Data3", DATA3, getOutput(T3));
120: assertEquals("Data4", DATA4, getOutput(T4));
121: }
122:
123: public void testInputStream() throws Exception {
124: DemuxInputStream input = new DemuxInputStream();
125: startReader(T1, DATA1, input);
126: startReader(T2, DATA2, input);
127: startReader(T3, DATA3, input);
128: startReader(T4, DATA4, input);
129:
130: doStart();
131: doJoin();
132:
133: assertEquals("Data1", DATA1, getInput(T1));
134: assertEquals("Data2", DATA2, getInput(T2));
135: assertEquals("Data3", DATA3, getInput(T3));
136: assertEquals("Data4", DATA4, getInput(T4));
137: }
138:
139: private static class ReaderThread extends Thread {
140: private StringBuffer m_buffer = new StringBuffer();
141: private InputStream m_input;
142: private DemuxInputStream m_demux;
143:
144: ReaderThread(String name, InputStream input,
145: DemuxInputStream demux) {
146: super (name);
147: m_input = input;
148: m_demux = demux;
149: }
150:
151: public String getData() {
152: return m_buffer.toString();
153: }
154:
155: public void run() {
156: m_demux.bindStream(m_input);
157:
158: try {
159: int ch = m_demux.read();
160: while (-1 != ch) {
161: //System.out.println( "Reading: " + (char)ch );
162: m_buffer.append((char) ch);
163:
164: int sleepTime = Math.abs(c_random.nextInt() % 10);
165: Thread.sleep(sleepTime);
166: ch = m_demux.read();
167: }
168: } catch (Exception e) {
169: e.printStackTrace();
170: }
171: }
172: }
173:
174: private static class WriterThread extends Thread {
175: private byte[] m_data;
176: private OutputStream m_output;
177: private DemuxOutputStream m_demux;
178:
179: WriterThread(String name, String data, OutputStream output,
180: DemuxOutputStream demux) {
181: super (name);
182: m_output = output;
183: m_demux = demux;
184: m_data = data.getBytes();
185: }
186:
187: public void run() {
188: m_demux.bindStream(m_output);
189: for (int i = 0; i < m_data.length; i++) {
190: try {
191: //System.out.println( "Writing: " + (char)m_data[ i ] );
192: m_demux.write(m_data[i]);
193: int sleepTime = Math.abs(c_random.nextInt() % 10);
194: Thread.sleep(sleepTime);
195: } catch (Exception e) {
196: e.printStackTrace();
197: }
198: }
199: }
200: }
201: }
|