01: /*
02: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
03: */
04: package com.tctest;
05:
06: import com.tctest.util.ThreadUtil;
07:
08: import java.util.ArrayList;
09: import java.util.Iterator;
10: import java.util.List;
11: import java.util.concurrent.BlockingQueue;
12: import java.util.concurrent.LinkedBlockingQueue;
13:
14: public class QueueMultiplexer {
15: private static final int MAX_BUFFER = 10;
16:
17: private transient BlockingQueue<Object> inputQueue;
18:
19: private List<BlockingQueue<Object>> outputQueues = new ArrayList<BlockingQueue<Object>>();
20:
21: public BlockingQueue<Object> getNewOutputQueue() {
22: synchronized (outputQueues) {
23: BlockingQueue<Object> q = new LinkedBlockingQueue<Object>();
24: outputQueues.add(q);
25: outputQueues.notify();
26: return q;
27: }
28: }
29:
30: private class OutputQueueWriter implements Runnable {
31: private BlockingQueue<Object> outputQueue;
32:
33: public OutputQueueWriter(BlockingQueue<Object> outputQueue) {
34: this .outputQueue = outputQueue;
35: }
36:
37: public void run() {
38: ArrayList<Object> l = new ArrayList<Object>(MAX_BUFFER);
39:
40: while (true) {
41: try {
42: l.clear();
43: Object item = inputQueue.take();
44: inputQueue.drainTo(l, MAX_BUFFER - 1);
45: l.add(0, item);
46: outputQueue.addAll(l);
47: } catch (InterruptedException ie) {
48: //
49: }
50: }
51: }
52: }
53:
54: public void putAll(Object item) {
55: synchronized (outputQueues) {
56: for (Iterator<BlockingQueue<Object>> i = outputQueues
57: .iterator(); i.hasNext();) {
58: BlockingQueue<Object> queue = i.next();
59: queue.offer(item);
60: }
61: }
62: }
63:
64: private class WaitForReaders implements Runnable {
65: public void run() {
66: int readers = 0;
67:
68: System.out.println("Waiting for readers...");
69: while (true) {
70: BlockingQueue<Object> q = null;
71:
72: synchronized (outputQueues) {
73: while (outputQueues.size() == readers) {
74: try {
75: outputQueues.wait();
76: } catch (InterruptedException ie) {
77: //
78: }
79: }
80: // an output queue was added, so spin up a thread
81: // to write to it
82: q = outputQueues.get(readers++);
83: }
84:
85: ThreadUtil.startDaemonThread(new OutputQueueWriter(q));
86: System.out.println("Started queue reader");
87: }
88: }
89: }
90:
91: public void start(BlockingQueue<Object> queue) {
92: synchronized (this ) {
93: this .inputQueue = queue;
94: }
95: ThreadUtil.startDaemonThread(new WaitForReaders());
96: }
97: }
|