001: /*
002:
003: * <copyright>
004: *
005: * Copyright 2002-2007 BBNT Solutions, LLC
006: * under sponsorship of the Defense Advanced Research Projects
007: * Agency (DARPA).
008: *
009: * You can redistribute this software and/or modify it under the
010: * terms of the Cougaar Open Source License as published on the
011: * Cougaar Open Source Website (www.cougaar.org).
012: *
013: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
014: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
015: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
016: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
017: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
018: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
019: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
020: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
021: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
022: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
023: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
024: *
025: * </copyright>
026:
027: */
028:
029: package org.cougaar.qos.qrs;
030:
031: import java.util.ArrayList;
032: import java.util.HashMap;
033: import java.util.HashSet;
034: import java.util.List;
035: import java.util.Map;
036: import java.util.Set;
037:
038: import org.cougaar.qos.ResourceStatus.RSSSubscriberPOA;
039: import org.cougaar.qos.ResourceStatus.ResourceNode;
040: import org.cougaar.qos.ResourceStatus.ResourceStatusService;
041: import org.cougaar.util.log.Logger;
042:
043: /**
044: * RelayDataFeeds are designed to move data from one RSS to another. This first
045: * one is simple but inefficient (the same formulas will be calculated at
046: * multiple levels) and not all particular about what it passes along
047: * (everything).
048: */
049:
050: public class PromiscuousRelayDataFeed extends AbstractDataFeed
051: implements Constants {
052: private final Map<String, Set<DataFeedListener>> listeners;
053: private final Map<String, DataValue> data;
054: private final Set<String> keys;
055: private final List<ResourceStatusService> services;
056:
057: public PromiscuousRelayDataFeed() {
058: super ();
059: listeners = new HashMap<String, Set<DataFeedListener>>();
060: data = new HashMap<String, DataValue>();
061: keys = new HashSet<String>();
062: services = new ArrayList<ResourceStatusService>();
063: }
064:
065: public void addService(ResourceStatusService rss) {
066: synchronized (services) {
067: services.add(rss);
068: }
069: synchronized (keys) {
070: for (String key : keys) {
071: Subscriber subscriber = new Subscriber(key, rss);
072: try {
073: CorbaUtils.poa.activate_object(subscriber);
074: subscriber.connect();
075: } catch (Exception ex) {
076: Logger logger = Logging
077: .getLogger(PromiscuousRelayDataFeed.class);
078: logger.error(null, ex);
079: }
080: }
081: }
082: }
083:
084: public void removeService(ResourceStatusService rss) {
085: synchronized (services) {
086: services.remove(rss);
087: // much more to do here.
088: }
089: }
090:
091: // Caller should synchronize on keys
092: private void makeNewSubscribersForKey(String key) {
093: Subscriber subscriber;
094: synchronized (services) {
095: for (ResourceStatusService rss : services) {
096: subscriber = new Subscriber(key, rss);
097: try {
098: CorbaUtils.poa.activate_object(subscriber);
099: subscriber.connect();
100: } catch (Exception ex) {
101: Logger logger = Logging
102: .getLogger(PromiscuousRelayDataFeed.class);
103: logger.error(null, ex);
104: }
105: }
106: }
107: }
108:
109: public void removeListenerForKey(DataFeedListener listener,
110: String key) {
111: Set<DataFeedListener> key_listeners = listeners.get(key);
112: if (key_listeners != null) {
113: synchronized (key_listeners) {
114: key_listeners.remove(listener);
115: }
116: }
117: }
118:
119: public void addListenerForKey(DataFeedListener listener, String key) {
120: Set<DataFeedListener> key_listeners = null;
121: synchronized (listeners) {
122: key_listeners = listeners.get(key);
123: if (key_listeners == null) {
124: key_listeners = new HashSet<DataFeedListener>();
125: listeners.put(key, key_listeners);
126: }
127: }
128: synchronized (key_listeners) {
129: key_listeners.add(listener);
130: }
131: }
132:
133: private void notifyListeners(String key, DataValue value) {
134: Set<DataFeedListener> key_listeners = listeners.get(key);
135: if (key_listeners != null) {
136: synchronized (key_listeners) {
137: for (DataFeedListener listener : key_listeners) {
138: listener.newData(this , key, value);
139: }
140: }
141: }
142: }
143:
144: public DataValue lookup(String key) {
145: synchronized (keys) {
146: if (!keys.contains(key)) {
147: makeNewSubscribersForKey(key);
148: keys.add(key);
149: }
150: }
151: return data.get(key);
152: }
153:
154: private void newData(final String key,
155: org.cougaar.qos.ResourceStatus.DataValue corba_value) {
156: synchronized (data) {
157: DataValue old_value = lookup(key);
158: double credibility = corba_value.credibility;
159: if (old_value == null
160: || old_value.getCredibility() <= credibility) {
161: DataValue new_value = new DataValue(corba_value);
162: data.put(key, new_value);
163: notifyListeners(key, new_value);
164: }
165: }
166:
167: }
168:
169: private class Subscriber extends RSSSubscriberPOA {
170: String key;
171: ResourceStatusService rss;
172:
173: Subscriber(String key, ResourceStatusService rss) {
174: this .key = key;
175: this .rss = rss;
176: }
177:
178: void connect() {
179: ResourceNode scope_ref = new ResourceNode();
180: String[] parameters = { key };
181: scope_ref.kind = "Integrater";
182: scope_ref.parameters = parameters;
183: ResourceNode formula_ref = new ResourceNode();
184: formula_ref.kind = "Formula";
185: formula_ref.parameters = new String[0];
186: ResourceNode[] description = { scope_ref, formula_ref };
187: rss.unqualifiedSubscribe(_this (), description, 0);
188: }
189:
190: public void dataUpdate(int callback_id,
191: final org.cougaar.qos.ResourceStatus.DataValue value) {
192: // run this in a different thread?
193: Runnable task = new Runnable() {
194: public void run() {
195: newData(key, value);
196: }
197: };
198: RSSUtils.schedule(task, 0);
199: }
200: }
201: }
|