001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.yp;
028:
029: import java.util.ArrayList;
030: import java.util.HashMap;
031:
032: /** Implement an asynchronous wait/notify map with value pass-through, selected by an opaque key.
033: **/
034: class WaitQueue {
035: private long counter = 0L;
036: private HashMap selects = new HashMap(11); // assume not too many at a time
037:
038: /** Construct a key to be used in this wait queue **/
039: Object getKey() {
040: Object key;
041: synchronized (selects) {
042: key = new Long(counter++);
043: selects.put(key, new Waiter());
044: }
045: return key;
046: }
047:
048: /** Activate any thread(s) which are waiting for a response from the key **/
049: void trigger(Object key, Object value) {
050: Waiter waiter = null;
051: synchronized (selects) {
052: waiter = (Waiter) selects.get(key);
053: if (waiter == null)
054: return; // bail if no waiter
055: }
056: waiter.trigger(value);
057: }
058:
059: /** Block the current thread until there is a triggerKey has been called
060: * on the referenced key.
061: * Returns immediately if the key is unknown or has previously been triggered.
062: * @throw InterruptedException if the wait is interrupted. In this case,
063: * the trigger is <em>not</em> cleared.
064: * @param key The key as returned by #getKey()
065: * @param timeout How long to wait or 0 (forever)
066: **/
067: Object waitFor(Object key, long timeout)
068: throws InterruptedException {
069: Waiter waiter = null;
070: synchronized (selects) {
071: waiter = (Waiter) selects.get(key);
072: if (waiter == null)
073: return null; // bail if no waiter
074: }
075:
076: // non-null waiter: pass control to it
077: Object value = waiter.waitFor(timeout);
078:
079: // when done, clear it
080: synchronized (selects) {
081: selects.remove(key);
082: }
083:
084: return value;
085: }
086:
087: /** alias for waitFor(key,0L);
088: **/
089: Object waitFor(Object key) throws InterruptedException {
090: return waitFor(key, 0L);
091: }
092:
093: /** Return true IFF waitFor(key) call would have blocked for some amount of time **/
094: boolean wouldBlock(Object key) {
095: Waiter waiter = null;
096: synchronized (selects) {
097: waiter = (Waiter) selects.get(key);
098: if (waiter == null)
099: return false;
100: }
101:
102: // non-null waiter: pass control to it
103: return waiter.wouldBlock();
104: }
105:
106: private static class Waiter {
107: private boolean trigger = false;
108: private Object value = null;
109:
110: synchronized Object waitFor(long timeout)
111: throws InterruptedException {
112: if (trigger)
113: return value;
114: this .wait(timeout);
115: return value;
116: }
117:
118: synchronized void trigger(Object value) {
119: trigger = true;
120: this .value = value;
121: this .notify();
122: }
123:
124: synchronized boolean wouldBlock() {
125: return (!trigger);
126: }
127: }
128:
129: public static void main(String[] arg) {
130: ArrayList keys = new ArrayList();
131: final WaitQueue wq = new WaitQueue();
132:
133: for (int i = 0; i < 10; i++) {
134: final Object k = wq.getKey();
135: final Object r = new Integer(i);
136: new Thread(new Runnable() {
137: public void run() {
138: try {
139: Thread.sleep(30 * 1000);
140: } catch (Exception e) {
141: e.printStackTrace();
142: }
143: wq.trigger(k, r);
144: }
145: }).start();
146: keys.add(k);
147:
148: try {
149: Thread.sleep(500);
150: } catch (Exception e) {
151: e.printStackTrace();
152: }
153:
154: }
155:
156: for (int i = 0; i < 10; i++) {
157: try {
158: Object v = wq.waitFor(keys.get(i));
159: System.out.println("result " + i + " = " + v);
160: } catch (Exception e) {
161: e.printStackTrace();
162: }
163: }
164: }
165:
166: }
|