001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: // Later this will move elsewhere...
028: package org.cougaar.core.qos.rss;
029:
030: import java.io.InputStream;
031: import java.net.URI;
032: import java.net.URL;
033: import java.util.HashMap;
034: import java.util.Map;
035: import java.util.Observable;
036: import java.util.Observer;
037: import java.util.Properties;
038: import java.util.regex.Matcher;
039: import java.util.regex.Pattern;
040:
041: import org.cougaar.core.component.ServiceBroker;
042: import org.cougaar.core.qos.metrics.DataFeedRegistrationService;
043: import org.cougaar.core.qos.metrics.DataProvider;
044: import org.cougaar.core.qos.metrics.Metric;
045: import org.cougaar.core.qos.metrics.MetricNotificationQualifier;
046: import org.cougaar.core.qos.metrics.MetricsService;
047: import org.cougaar.core.qos.metrics.MetricsUpdateService;
048: import org.cougaar.core.qos.metrics.QosComponent;
049: import org.cougaar.core.qos.metrics.VariableEvaluator;
050: import org.cougaar.core.service.LoggingService;
051: import org.cougaar.core.service.ThreadService;
052: import org.cougaar.core.thread.RunnableQueue;
053: import org.cougaar.qos.ResourceStatus.ResourceDescriptionParseException;
054: import org.cougaar.qos.ResourceStatus.ResourceNode;
055: import org.cougaar.qos.qrs.BoundDataFormula;
056: import org.cougaar.qos.qrs.CorbaUtils;
057: import org.cougaar.qos.qrs.DataFeed;
058: import org.cougaar.qos.qrs.DataFormula;
059: import org.cougaar.qos.qrs.DataValue;
060: import org.cougaar.qos.qrs.NotificationQualifier;
061: import org.cougaar.qos.qrs.NullFormulaException;
062: import org.cougaar.qos.qrs.PathParser;
063: import org.cougaar.qos.qrs.RSS;
064: import org.cougaar.qos.qrs.RSSUtils;
065: import org.cougaar.qos.qrs.SitesDB;
066: import org.cougaar.util.ConfigFinder;
067:
068: /**
069: * This Component is an implementation of MetricsService that uses the RSS for
070: * data lookup. Instantiated by and as child component of the
071: * {@link RSSMetricsServiceProvider}.
072: *
073: * @property org.cougaar.metrics.properties The name of an RSS config file.
074: */
075: public class RSSMetricsServiceImpl extends QosComponent implements
076: MetricsService, DataFeedRegistrationService {
077:
078: // Setup name->class mappings
079:
080: private static final String RSS_PROPERTIES = "org.cougaar.metrics.properties";
081:
082: private LoggingService loggingService;
083: private ThreadService threadService;
084: private RSSMetricsUpdateServiceImpl metricsUpdateService;
085: private RunnableQueue subscriptionQueue;
086: private Map<String, BoundDataFormula> bdfCache;
087:
088: private static class DataValueObserver implements Observer,
089: DataProvider {
090: Observer observer;
091: BoundDataFormula bdf;
092:
093: DataValueObserver(Observer observer, BoundDataFormula bdf) {
094: this .observer = observer;
095: this .bdf = bdf;
096:
097: bdf.addObserver(this );
098: }
099:
100: public void update(Observable observable, Object value) {
101: DataValue dValue = (DataValue) value;
102: DataWrapper wrapper = new DataWrapper(dValue);
103: observer.update(observable, wrapper);
104: }
105:
106: void unsubscribe() {
107: bdf.deleteObserver(this );
108: }
109:
110: public String getPath() {
111: return RSSUtils.pathToString(bdf.getDescription());
112: }
113: }
114:
115: private static class Qualifier implements NotificationQualifier {
116: MetricNotificationQualifier qualifier;
117:
118: Qualifier(MetricNotificationQualifier qualifier) {
119: this .qualifier = qualifier;
120: }
121:
122: public synchronized boolean shouldNotify(DataValue value) {
123: return qualifier.shouldNotify(new DataWrapper(value));
124: }
125:
126: }
127:
128: public RSSMetricsServiceImpl() {
129: }
130:
131: public void load() {
132: super .load();
133:
134: ServiceBroker sb = getServiceBroker();
135: loggingService = sb
136: .getService(this , LoggingService.class, null);
137: threadService = sb.getService(this , ThreadService.class, null);
138:
139: subscriptionQueue = new RunnableQueue(threadService,
140: "SubscriptionQueue");
141:
142: MetricsUpdateService mus = sb.getService(this ,
143: MetricsUpdateService.class, null);
144: this .metricsUpdateService = (RSSMetricsUpdateServiceImpl) mus;
145:
146: Properties properties = new Properties();
147: String propertiesURL = System.getProperty(RSS_PROPERTIES);
148: if (propertiesURL != null) {
149: try {
150: URL url = new URL(propertiesURL);
151: java.io.InputStream is = url.openStream();
152: properties.load(is);
153: is.close();
154: } catch (Exception ex) {
155: }
156: }
157:
158: // Make a ServiceBroker available to AgentDS and HostDS.
159: properties.put("ServiceBroker", sb);
160:
161: // Register standard RSS contexts.
162: // The class reference will also run a static
163: // code block to get the local IP address.
164: CorbaUtils.registerContexts();
165:
166: // Register local contexts
167: AgentDS.register();
168: AgentFlowDS.register();
169: DestinationDS.register();
170: NodeDS.register();
171: ServiceDS.register();
172: org.cougaar.core.qos.gossip.GossipIntegraterDS
173: .registerContext();
174:
175: DataFeed feed = null;
176: String feedName = null;
177: feed = metricsUpdateService.getMetricsFeed();
178: feedName = "MetricsDataFeed";
179:
180: RSS.makeInstance(properties);
181: if (feed != null) {
182: feed.setName(feedName);
183: RSS.instance().registerFeed(feed, feedName);
184: }
185:
186: bdfCache = new HashMap<String, BoundDataFormula>();
187:
188: // Used to start this here. Now do it via loadable Component.
189: // AgentHostUpdaterComponent comp = new AgentHostUpdaterComponent();
190: // comp.provideService(sb);
191: }
192:
193: // Data Feed Registration Service
194: public boolean registerFeed(Object feed, String name) {
195: if (feed instanceof DataFeed) {
196: RSS.instance().registerFeed((DataFeed) feed, name);
197: return true;
198: } else {
199: return false;
200: }
201: }
202:
203: public void populateSites(String sitesURLString) {
204: ConfigFinder finder = ConfigFinder.getInstance();
205: SitesDB db = RSS.instance().getSitesDB();
206: try {
207: URI uri = null;
208: try {
209: uri = new URI(sitesURLString);
210: } catch (java.net.URISyntaxException ex) {
211: return;
212: }
213:
214: String scheme = uri.getScheme();
215: String path = uri.getSchemeSpecificPart();
216: if (scheme
217: .equals(ConfigFinderDataFeedComponent.CONFIG_PROTOCOL)) {
218: InputStream stream = finder.open(path);
219: db.populate(stream);
220: } else {
221: URL sitesURL = new URL(sitesURLString);
222: db.populate(sitesURL);
223: }
224: } catch (Exception ex) {
225: loggingService.error("No Sites file loaded: "
226: + ex.toString());
227: }
228: }
229:
230: private static final Pattern pattern = Pattern
231: .compile("\\$\\([^\\)]*\\)");
232:
233: private String evaluateVariables(String path, VariableEvaluator eval) {
234: if (eval == null) {
235: return path;
236: }
237:
238: StringBuffer buf = new StringBuffer();
239: Matcher matcher = pattern.matcher(path);
240: while (matcher.find()) {
241: String match = matcher.group();
242: String var = match.substring(2, match.length() - 1);
243: String val = eval.evaluateVariable(var);
244: if (val == null) {
245: if (loggingService.isErrorEnabled()) {
246: loggingService.error("Path variable " + var
247: + " has no value");
248: }
249: // What's the right way to proceed? For now put
250: // pattern back in.
251: matcher.appendReplacement(buf, match);
252: } else {
253: matcher.appendReplacement(buf, val);
254: }
255: }
256: matcher.appendTail(buf);
257:
258: return buf.toString();
259: }
260:
261: // Metric Service
262: public Metric getValue(String path) {
263: return getValue(path, null, null);
264: }
265:
266: public Metric getValue(String path, Properties qos_tags) {
267: return getValue(path, null, qos_tags);
268: }
269:
270: public Metric getValue(String path, VariableEvaluator evaluator) {
271: return getValue(path, evaluator, null);
272: }
273:
274: // Qos properties not supported yet
275: public Metric getValue(String path, VariableEvaluator evaluator,
276: Properties qos_tags) {
277: if (path == null) {
278: return null;
279: }
280:
281: path = evaluateVariables(path, evaluator);
282: BoundDataFormula bdf = null;
283: synchronized (bdfCache) {
284: bdf = bdfCache.get(path);
285: if (bdf == null) {
286: try {
287: ResourceNode[] path_spec = PathParser
288: .parsePath(path);
289: bdf = new BoundDataFormula(path_spec);
290: } catch (NullFormulaException ex) {
291: return null;
292: } catch (ResourceDescriptionParseException ex) {
293: return null;
294: }
295: bdfCache.put(path, bdf);
296: }
297: }
298: Object rawValue = bdf.getCurrentValue();
299: // Object rawValue = RSSUtils.getPathValue(path);
300:
301: if (rawValue == null) {
302: return null;
303: } else if (rawValue instanceof DataFormula) {
304: return new DataWrapper(((DataFormula) rawValue).query());
305: } else if (rawValue instanceof DataValue) {
306: return new DataWrapper((DataValue) rawValue);
307: } else {
308: if (loggingService.isErrorEnabled()) {
309: loggingService.error("Unexpected data value "
310: + rawValue + " for path " + path);
311: }
312: return null;
313: }
314: }
315:
316: public Object subscribeToValue(String path, Observer observer) {
317: return subscribeToValue(path, observer, null, null);
318: }
319:
320: public Object subscribeToValue(String path, Observer observer,
321: VariableEvaluator evaluator) {
322: return subscribeToValue(path, observer, evaluator, null);
323: }
324:
325: public Object subscribeToValue(String path, Observer observer,
326: MetricNotificationQualifier qualifier) {
327: return subscribeToValue(path, observer, null, qualifier);
328: }
329:
330: public Object subscribeToValue(String path, Observer observer,
331: VariableEvaluator evaluator,
332: MetricNotificationQualifier qualifier) {
333: path = evaluateVariables(path, evaluator);
334: try {
335: Qualifier qual = qualifier == null ? null : new Qualifier(
336: qualifier);
337: // Defer the formula creation, since it might result in a
338: // 'dns' lookup.
339: ResourceNode[] path_spec = PathParser.parsePath(path);
340: BoundDataFormula bdf = new BoundDataFormula(path_spec,
341: true, qual);
342: Runnable binder = bdf.getDelayedFormulaCreator();
343: subscriptionQueue.add(binder);
344: return new DataValueObserver(observer, bdf);
345: } catch (NullFormulaException ex) {
346: loggingService.error(path + " is not valid");
347: return null;
348: } catch (ResourceDescriptionParseException ex) {
349: loggingService.error(path + " is not valid");
350: return null;
351: }
352:
353: }
354:
355: public void unsubscribeToValue(Object key) {
356: DataValueObserver obs = (DataValueObserver) key;
357: obs.unsubscribe();
358: }
359:
360: }
|