001: /*
002:
003: * <copyright>
004: *
005: * Copyright 2002-2007 BBNT Solutions, LLC
006: * under sponsorship of the Defense Advanced Research Projects
007: * Agency (DARPA).
008: *
009: * You can redistribute this software and/or modify it under the
010: * terms of the Cougaar Open Source License as published on the
011: * Cougaar Open Source Website (www.cougaar.org).
012: *
013: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
014: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
015: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
016: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
017: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
018: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
019: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
020: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
021: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
022: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
023: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
024: *
025: * </copyright>
026:
027: */
028:
029: package org.cougaar.qos.qrs;
030:
031: import java.util.HashMap;
032: import java.util.HashSet;
033: import java.util.Map;
034: import java.util.Set;
035:
036: import org.cougaar.util.CircularQueue;
037:
038: abstract public class SimpleQueueingDataFeed extends AbstractDataFeed {
039: private final Map<String, Set<DataFeedListener>> listeners;
040: private final Map<String, DataValue> data;
041: private final CircularQueue<String> queue;
042: private final Runnable notifier;
043:
044: private class Notifier implements Runnable {
045: public void run() {
046: String key = null;
047: DataValue value = null;
048: while (true) {
049: key = nextKey();
050: if (key == null) {
051: break;
052: }
053: value = lookup(key);
054: if (value == null) {
055: continue;
056: }
057:
058: notifyListeners(key, value);
059: }
060: }
061: }
062:
063: protected Runnable makeNotifier() {
064: return new Notifier();
065: }
066:
067: protected SimpleQueueingDataFeed() {
068: listeners = new HashMap<String, Set<DataFeedListener>>();
069: data = new HashMap<String, DataValue>();
070: queue = new CircularQueue<String>();
071: notifier = makeNotifier();
072: }
073:
074: protected String nextKey() {
075: String next = null;
076: synchronized (queue) {
077: if (!queue.isEmpty()) {
078: next = queue.next();
079: }
080: }
081: return next;
082: }
083:
084: protected boolean isEmpty() {
085: synchronized (queue) {
086: return queue.isEmpty();
087: }
088: }
089:
090: protected void dispatch() {
091: notifier.run();
092: }
093:
094: protected Runnable getNotifier() {
095: return notifier;
096: }
097:
098: public void removeListenerForKey(DataFeedListener listener,
099: String key) {
100: Set<DataFeedListener> key_listeners = listeners.get(key);
101: if (key_listeners != null) {
102: synchronized (key_listeners) {
103: key_listeners.remove(listener);
104: }
105: }
106: }
107:
108: public void addListenerForKey(DataFeedListener listener, String key) {
109: Set<DataFeedListener> key_listeners = null;
110: synchronized (listeners) {
111: key_listeners = listeners.get(key);
112: if (key_listeners == null) {
113: key_listeners = new HashSet<DataFeedListener>();
114: listeners.put(key, key_listeners);
115: }
116: }
117: synchronized (key_listeners) {
118: key_listeners.add(listener);
119: }
120: }
121:
122: protected void notifyListeners(String key, DataValue value) {
123: Set<DataFeedListener> key_listeners = listeners.get(key);
124: if (key_listeners != null) {
125: synchronized (key_listeners) {
126: for (DataFeedListener listener : key_listeners) {
127: listener.newData(this , key, value);
128: }
129: }
130: }
131: }
132:
133: public DataValue lookup(String key) {
134: return data.get(key);
135: }
136:
137: public <T> void newData(String key, T raw,
138: DataInterpreter<T> interpreter) {
139: synchronized (data) {
140: DataValue old_value = lookup(key);
141: DataValue new_value;
142: double credibility;
143: if (interpreter != null) {
144: credibility = interpreter.getCredibility(raw);
145: } else {
146: credibility = ((DataValue) raw).getCredibility();
147: }
148: if (old_value == null
149: || old_value.getCredibility() <= credibility) {
150: if (interpreter != null) {
151: new_value = interpreter.getDataValue(raw);
152: } else {
153: new_value = (DataValue) raw;
154: }
155: data.put(key, new_value);
156: } else {
157: return;
158: }
159: }
160:
161: synchronized (queue) {
162: if (!queue.contains(key)) {
163: queue.add(key);
164: // XXX: Should the dispatch really keep the queue locked?
165: dispatch();
166: }
167: }
168: }
169:
170: }
|