001: /*
002: * <copyright>
003: *
004: * Copyright 2003-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.aggagent.query;
028:
029: import java.io.Serializable;
030: import java.util.Collection;
031: import java.util.Enumeration;
032: import java.util.HashMap;
033: import java.util.HashSet;
034: import java.util.Iterator;
035: import java.util.LinkedList;
036: import java.util.List;
037: import java.util.Map;
038: import java.util.Set;
039: import java.util.Vector;
040:
041: import org.cougaar.lib.aggagent.session.UpdateDelta;
042: import org.cougaar.lib.aggagent.session.XmlTransferable;
043: import org.cougaar.lib.aggagent.util.InverseSax;
044: import org.cougaar.lib.aggagent.util.XmlUtils;
045: import org.w3c.dom.Element;
046: import org.w3c.dom.NodeList;
047:
048: /**
049: * A Repository for results being returned by Clusters for the associated
050: * AggregationQuery.
051: */
052: public class AggregationResultSet implements XmlTransferable,
053: Serializable {
054: public static String RESULT_SET_TAG = "result_set";
055: public static String QUERY_ID_ATT = "query_id";
056:
057: private static String EXCEPTION_TAG = "resultset_exception";
058: private static String CLUSTER_ID_ATT = "clusterId";
059: private static String CLUSTER_TAG = "cluster";
060: private static String ID_ATT = "id";
061:
062: private static String CLUSTER_IDENTIFIER = "cluster";
063: private static String AGGREGATED_IDENTIFIER = "aggregated";
064:
065: private Object lock = new Serializable() {
066: };
067:
068: private QueryResultAdapter query = null;
069: private List idNames = new LinkedList();
070: private boolean firstUpdate = true;
071: private Map clusterTable = new HashMap();
072: private Map exceptionMap = new HashMap();
073: private Set respondingClusters = new HashSet();
074: private UpdateObservable updateObservable = new UpdateObservable();
075: private List resultSetChangeListeners = new LinkedList();
076:
077: /**
078: * Default Constructor
079: */
080: public AggregationResultSet() {
081: }
082:
083: /**
084: * Create Result Set from xml.
085: */
086: public AggregationResultSet(Element root) {
087: NodeList nl = root.getElementsByTagName(EXCEPTION_TAG);
088: for (int i = 0; i < nl.getLength(); i++) {
089: Element exceptionElement = (Element) nl.item(i);
090: String clusterId = exceptionElement
091: .getAttribute(CLUSTER_ID_ATT);
092: String exceptionDescription = XmlUtils
093: .getElementText(exceptionElement);
094: exceptionMap.put(clusterId, exceptionDescription);
095: }
096:
097: nl = root.getElementsByTagName(CLUSTER_TAG);
098: for (int i = 0; i < nl.getLength(); i++) {
099: Element cluster = (Element) nl.item(i);
100: String cid = cluster.getAttribute(ID_ATT);
101: createAtomsByAgent(cid, cluster);
102: }
103: }
104:
105: private void createAtomsByAgent(String agentId, Element root) {
106: NodeList nl = root
107: .getElementsByTagName(ResultSetDataAtom.DATA_ATOM_TAG);
108: for (int i = 0; i < nl.getLength(); i++)
109: update(agentId, new ResultSetDataAtom((Element) nl.item(i)));
110: }
111:
112: /**
113: * Specify the query (etc.) for this result set.
114: */
115: public void setQueryAdapter(QueryResultAdapter s) {
116: query = s;
117: }
118:
119: /**
120: * Provide access to this result set's QueryResultAdapter
121: */
122: public QueryResultAdapter getQueryAdapter() {
123: return query;
124: }
125:
126: /**
127: * Returns the list of clusters contained in this result set
128: */
129: public Enumeration getClusters() {
130: // return as an enumeration so that elements cannot be removed
131: Vector keys = new Vector(clusterTable.keySet());
132: return keys.elements();
133: }
134:
135: /**
136: * Set an exception message for a cluster that occured when attempting
137: * to update this result set (or setup query).
138: */
139: public void setException(String clusterId, String exceptionMessage) {
140: exceptionMap.put(clusterId, exceptionMessage);
141: }
142:
143: /**
144: * Return a map of exception descriptions thrown by source clusters when
145: * attempting to update this result set. Map keys are clusterId strings.
146: * Map values are exception description strings.
147: */
148: public Map getExceptionMap() {
149: return exceptionMap;
150: }
151:
152: /**
153: * Return a string summary of exception descriptions thrown by source
154: * clusters when attempting to update this result set.
155: */
156: public String getExceptionSummary() {
157: StringBuffer s = new StringBuffer();
158: for (Iterator i = exceptionMap.values().iterator(); i.hasNext();) {
159: s.append(i.next().toString() + "\n");
160: s.append("-----------------------------\n");
161: }
162: return s.toString();
163: }
164:
165: /**
166: * Returns true if an exception was thrown by a source cluster when
167: * attempting to run the query for this result set.
168: */
169: public boolean exceptionThrown() {
170: return exceptionMap.size() > 0;
171: }
172:
173: /**
174: * Update this AggregationResultSet by inserting a new data atom into the
175: * table. The provided clusterId identifies the cluster of origin of the
176: * datum.
177: */
178: private void update(String clusterId, ResultSetDataAtom atom) {
179: if (firstUpdate) {
180: firstUpdate = false;
181: for (Iterator i = atom.getIdentifierNames(); i.hasNext();)
182: idNames.add(i.next());
183: }
184:
185: Map data = (Map) clusterTable.get(clusterId);
186: if (data == null)
187: clusterTable.put(clusterId, data = new HashMap());
188:
189: data.put(atom.getKey(idNames), atom.getValueMap());
190:
191: synchronized (respondingClusters) {
192: respondingClusters.add(clusterId);
193: }
194: }
195:
196: /**
197: * Remove a ResultSetDataAtom from the result set.
198: */
199: private void remove(String clusterId, ResultSetDataAtom atom) {
200: Map data = (Map) clusterTable.get(clusterId);
201: if (data != null)
202: data.remove(atom.getKey(idNames));
203: }
204:
205: /**
206: * Update this AggregationResultSet by inserting a series of data purported
207: * to come from the specified cluster. The data are presented in XML format
208: * and must be parsed into individual ResultSetDataAtoms.
209: */
210: private void update(String agentId, Collection atoms) {
211: for (Iterator i = atoms.iterator(); i.hasNext();)
212: update(agentId, (ResultSetDataAtom) i.next());
213: }
214:
215: /**
216: * Remove a series of data from this result set.
217: */
218: private void remove(String agentId, Collection atoms) {
219: for (Iterator i = atoms.iterator(); i.hasNext();)
220: remove(agentId, (ResultSetDataAtom) i.next());
221: }
222:
223: private void removeAll(String agentId) {
224: Map table = (Map) clusterTable.get(agentId);
225: if (table != null)
226: table.clear();
227: }
228:
229: public void incrementalUpdate(UpdateDelta delta) {
230: String agentId = delta.getAgentId();
231:
232: synchronized (respondingClusters) {
233: respondingClusters.add(agentId);
234: }
235:
236: // update result set based on incremental change xml
237: synchronized (lock) {
238: if (delta.isErrorReport()) {
239: setException(delta.getAgentId(), delta.getErrorReport());
240: } else if (delta.isReplacement()) {
241: removeAll(agentId);
242: update(agentId, delta.getReplacementList());
243: } else {
244: update(agentId, delta.getAddedList());
245: update(agentId, delta.getChangedList());
246: remove(agentId, delta.getRemovedList());
247: }
248: }
249: }
250:
251: /**
252: * Used to completely remove the data associated with a specific agent from the
253: * result set.
254: */
255:
256: protected void removeClusterId(String clusterId) {
257: synchronized (lock) {
258: clusterTable.remove(clusterId);
259: }
260: }
261:
262: private void removeAllAtoms() {
263: clusterTable.clear();
264: }
265:
266: public void replaceAggregated(List atoms) {
267: synchronized (lock) {
268: removeAllAtoms();
269: for (Iterator i = atoms.iterator(); i.hasNext();)
270: update(AGGREGATED_IDENTIFIER, (ResultSetDataAtom) i
271: .next());
272: }
273: fireObjectChanged();
274: }
275:
276: /**
277: * Update this result set to match passed in result set
278: */
279: public void update(AggregationResultSet rs) {
280: this .idNames = rs.idNames;
281: this .firstUpdate = rs.firstUpdate;
282: this .clusterTable = rs.clusterTable;
283: this .exceptionMap = rs.exceptionMap;
284:
285: fireObjectChanged();
286: }
287:
288: /**
289: * Add an update listener to observe this object
290: */
291: public void addUpdateListener(UpdateListener ul) {
292: updateObservable.addUpdateListener(ul);
293: }
294:
295: /**
296: * Remove an update listener such that it no longer gets notified of changes
297: * to this object
298: */
299: public void removeUpdateListener(UpdateListener ul) {
300: updateObservable.removeUpdateListener(ul);
301: }
302:
303: /**
304: * Send event to all update listeners indicating that object has been added
305: * to the log plan.
306: */
307: public void fireObjectAdded() {
308: updateObservable.fireObjectAdded(this );
309: }
310:
311: /**
312: * Send event to all update listeners indicating that object has been removed
313: * from the log plan.
314: */
315: public void fireObjectRemoved() {
316: updateObservable.fireObjectRemoved(this );
317: }
318:
319: /**
320: * Send event to all update listeners indicating that object has been changed
321: * on the log plan.
322: */
323: private void fireObjectChanged() {
324: updateObservable.fireObjectChanged(this );
325: }
326:
327: public Iterator getAllAtoms() {
328: List l = new LinkedList();
329: synchronized (lock) {
330: for (Iterator c = clusterTable.entrySet().iterator(); c
331: .hasNext();) {
332: Map.Entry cluster = (Map.Entry) c.next();
333: Object name = cluster.getKey();
334: Map atoms = (Map) cluster.getValue();
335: for (Iterator v = atoms.entrySet().iterator(); v
336: .hasNext();) {
337: Map.Entry pair = (Map.Entry) v.next();
338: ResultSetDataAtom a = new ResultSetDataAtom(
339: idNames, (CompoundKey) pair.getKey(),
340: (Map) pair.getValue());
341: a.addIdentifier(CLUSTER_IDENTIFIER, name);
342: l.add(a);
343: }
344: }
345: }
346: return l.iterator();
347: }
348:
349: public String toXml() {
350: InverseSax doc = new InverseSax();
351: includeXml(doc);
352: return doc.toString();
353: }
354:
355: public void includeXml(InverseSax doc) {
356: doc.addElement(RESULT_SET_TAG);
357: if (query != null)
358: doc.addAttribute(QUERY_ID_ATT, query.getID());
359:
360: synchronized (lock) {
361: for (Iterator i = exceptionMap.entrySet().iterator(); i
362: .hasNext();) {
363: Map.Entry entry = (Map.Entry) i.next();
364: doc.addEltAttText(EXCEPTION_TAG, CLUSTER_ID_ATT, entry
365: .getKey().toString(), entry.getValue()
366: .toString());
367: }
368:
369: for (Iterator i = clusterTable.entrySet().iterator(); i
370: .hasNext();) {
371: Map.Entry table = (Map.Entry) i.next();
372: doc.addElement(CLUSTER_TAG);
373: doc.addAttribute(ID_ATT, table.getKey().toString());
374: for (Iterator j = ((Map) table.getValue()).entrySet()
375: .iterator(); j.hasNext();) {
376: Map.Entry entry = (Map.Entry) j.next();
377: (new ResultSetDataAtom(idNames, (CompoundKey) entry
378: .getKey(), (Map) entry.getValue()))
379: .includeXml(doc);
380: }
381: doc.endElement();
382: }
383: }
384:
385: doc.endElement();
386: }
387:
388: public Set getRespondingClusters() {
389: // pass a copy back (iteration needs to be synchronized with adds)
390: Set responded = null;
391: synchronized (respondingClusters) {
392: responded = new HashSet();
393: Iterator iter = respondingClusters.iterator();
394: while (iter.hasNext())
395: responded.add(iter.next());
396: }
397:
398: return responded;
399: }
400: }
|