001: /* ====================================================================
002: * The LateralNZ Software License, Version 1.0
003: *
004: * Copyright (c) 2003 LateralNZ. All rights reserved.
005: *
006: * Redistribution and use in source and binary forms, with or without
007: * modification, are permitted provided that the following conditions
008: * are met:
009: *
010: * 1. Redistributions of source code must retain the above copyright
011: * notice, this list of conditions and the following disclaimer.
012: *
013: * 2. Redistributions in binary form must reproduce the above copyright
014: * notice, this list of conditions and the following disclaimer in
015: * the documentation and/or other materials provided with the
016: * distribution.
017: *
018: * 3. The end-user documentation included with the redistribution,
019: * if any, must include the following acknowledgment:
020: * "This product includes software developed by
021: * LateralNZ (http://www.lateralnz.org/) and other third parties."
022: * Alternately, this acknowledgment may appear in the software itself,
023: * if and wherever such third-party acknowledgments normally appear.
024: *
025: * 4. The names "LateralNZ" must not be used to endorse or promote
026: * products derived from this software without prior written
027: * permission. For written permission, please
028: * contact oss@lateralnz.org.
029: *
030: * 5. Products derived from this software may not be called "Panther",
031: * or "Lateral" or "LateralNZ", nor may "PANTHER" or "LATERAL" or
032: * "LATERALNZ" appear in their name, without prior written
033: * permission of LateralNZ.
034: *
035: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
036: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
037: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
038: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
039: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
040: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
041: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
042: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
043: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
044: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
045: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
046: * SUCH DAMAGE.
047: * ====================================================================
048: *
049: * This software consists of voluntary contributions made by many
050: * individuals on behalf of LateralNZ. For more
051: * information on Lateral, please see http://www.lateralnz.com/ or
052: * http://www.lateralnz.org
053: *
054: */
055: package org.lateralnz.common.util;
056:
057: import java.io.Serializable;
058: import java.util.ArrayList;
059:
060: import org.lateralnz.common.wrapper.IntHolder;
061:
062: /**
063: * a simple queue with a synchronized getter that returns (and removes)
064: * the first object in the list (ie. FIFO). This has an option to support notify()/wait()
065: *
066: * @author J R Briggs
067: */
068: public class Queue implements Serializable, Constants {
069: private InnerArrayList queue = new InnerArrayList();
070: private IntHolder messageIdx = new IntHolder(0);
071: private int cleanupSize = 100;
072: private boolean blocking = false;
073:
074: public Queue() {
075: this (100, false);
076: }
077:
078: public Queue(boolean blocking) {
079: this (100, blocking);
080: }
081:
082: public Queue(int cleanupSize) {
083: this (cleanupSize, false);
084: }
085:
086: public Queue(int cleanupSize, boolean blocking) {
087: this .cleanupSize = cleanupSize;
088: this .blocking = blocking;
089: }
090:
091: public int getSize() {
092: return queue.size() - messageIdx.value;
093: }
094:
095: public synchronized void add(Object obj) {
096: queue.add(obj);
097:
098: if (messageIdx.value > cleanupSize) {
099: int removed = queue.remove(0, cleanupSize);
100: messageIdx.value -= removed;
101: }
102:
103: if (blocking) {
104: try {
105: notify();
106: } catch (IllegalMonitorStateException itme) {
107: System.err.println("illegal monitor state");
108: }
109: }
110: }
111:
112: private boolean available() {
113: if (messageIdx.value >= 0 && messageIdx.value < queue.size()) {
114: return true;
115: } else {
116: return false;
117: }
118: }
119:
120: public synchronized Object get() throws InterruptedException {
121: if (blocking) {
122: while (!available()) {
123: wait();
124: }
125: }
126:
127: Object rtn = null;
128: int idx = messageIdx.value;
129:
130: if (available()) {
131: messageIdx.value++;
132: rtn = queue.get(idx);
133: }
134:
135: return rtn;
136: }
137:
138: /**
139: * an array list that supports removing a range of values (for performance reasons)
140: */
141: private class InnerArrayList extends ArrayList {
142: /**
143: * remove a range beginning at start (inclusive) and ending at end (exclusive)
144: */
145: public int remove(int start, int end) {
146: if (start > size()) {
147: return 0;
148: }
149: if (end > size()) {
150: end = size();
151: }
152:
153: this .removeRange(start, end);
154: return end - start;
155: }
156: }
157:
158: // simple test
159: public static final void main(String[] args) throws Exception {
160: final Queue queue = new Queue(true);
161: final ArrayList test = new ArrayList();
162:
163: final int max = 10000;
164:
165: Thread t = new Thread() {
166: public void run() {
167: java.util.Random rand = new java.util.Random();
168: for (int i = 0; i < max; i++) {
169: queue.add("test" + i);
170: try {
171: sleep(rand.nextInt(5));
172: } catch (Exception e) {
173: }
174: }
175: }
176: };
177:
178: for (int i = 0; i < 5; i++) {
179: final int idx = i;
180: Thread l = new Thread() {
181: public void run() {
182: java.util.Random rand = new java.util.Random();
183: while (true) {
184: try {
185: System.out
186: .println("#" + idx + " listening");
187: Object obj = queue.get();
188: if (obj == null) {
189: System.out.println(">>ERROR<<");
190: System.exit(1);
191: }
192: System.out.println("#" + idx + " got "
193: + obj);
194: test.add(obj);
195: if (test.size() >= max) {
196: System.exit(0);
197: }
198: sleep(rand.nextInt(10));
199: } catch (Exception e) {
200: }
201: }
202: }
203: };
204: l.start();
205: }
206:
207: t.start();
208: }
209: }
|