001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.glm.ldm.lps;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Enumeration;
032: import java.util.HashMap;
033: import java.util.Iterator;
034:
035: import org.cougaar.core.blackboard.Directive;
036: import org.cougaar.core.blackboard.EnvelopeTuple;
037: import org.cougaar.core.domain.EnvelopeLogicProvider;
038: import org.cougaar.core.domain.LogicProvider;
039: import org.cougaar.core.domain.MessageLogicProvider;
040: import org.cougaar.core.domain.RestartLogicProvider;
041: import org.cougaar.core.domain.RestartLogicProviderHelper;
042: import org.cougaar.core.domain.RootPlan;
043: import org.cougaar.core.mts.MessageAddress;
044: import org.cougaar.core.util.UID;
045: import org.cougaar.core.util.UniqueObject;
046: import org.cougaar.glm.ldm.GLMFactory;
047: import org.cougaar.glm.ldm.plan.DetailReplyAssignment;
048: import org.cougaar.glm.ldm.plan.DetailRequest;
049: import org.cougaar.glm.ldm.plan.DetailRequestAssignment;
050: import org.cougaar.glm.ldm.plan.QueryReplyAssignment;
051: import org.cougaar.glm.ldm.plan.QueryRequest;
052: import org.cougaar.glm.ldm.plan.QueryRequestAssignment;
053: import org.cougaar.planning.ldm.plan.Transferable;
054: import org.cougaar.util.UnaryPredicate;
055: import org.cougaar.util.log.Logging;
056:
057: /**
058: * Logic Provider for handling Detail and Query Requests and Replys.
059: * It is both an EnvelopeLogicProvider and a MessageLogicProvider,
060: * so it both reads from the blackboard and accepts directive messages.
061: *
062: */
063:
064: public class DetailRequestLP implements LogicProvider,
065: EnvelopeLogicProvider, RestartLogicProvider,
066: MessageLogicProvider {
067: private final RootPlan rootplan;
068: private final MessageAddress self;
069: private final GLMFactory glmFactory;
070: private final HashMap outstandingRequests = new HashMap(7);
071:
072: public DetailRequestLP(RootPlan rootplan, MessageAddress self,
073: GLMFactory glmFactory) {
074: this .rootplan = rootplan;
075: this .self = self;
076: this .glmFactory = glmFactory;
077: }
078:
079: public void init() {
080: }
081:
082: /**
083: * implements execute from EnvelopePlanLogicProvider
084: * processes DetailRequests and QueryRequests published in
085: * the blackboard
086: */
087: public void execute(EnvelopeTuple o, Collection changes) {
088: Object obj = o.getObject();
089: if (obj instanceof DetailRequest) {
090: Logging.defaultLogger().debug("Received DetailRequest");
091: DetailRequest ir = (DetailRequest) obj;
092: if (o.isAdd()) {
093: processDetailRequestAdded(ir);
094: } else if (o.isRemove()) {
095: // no-nop
096: }
097: } else if (obj instanceof QueryRequest) {
098: Logging.defaultLogger().debug("Received QueryRequest");
099: QueryRequest qr = (QueryRequest) obj;
100: if (o.isAdd()) {
101: processQueryRequestAdded(qr);
102: } else if (o.isRemove()) {
103: // no-op
104: }
105: }
106: }
107:
108: /**
109: * implements execute() from MessageLogicProvider
110: * Processes DetailRequest/ReplyAssignment directives from other agents.
111: * Processes QueryRequest/ReplyAssignment directives from other agents.
112: **/
113: public void execute(Directive dir, Collection changes) {
114: if (dir instanceof DetailReplyAssignment) {
115: Logging.defaultLogger().debug(
116: "Received DetailReplyAssignment");
117: processDetailReplyAssignment((DetailReplyAssignment) dir,
118: changes);
119: } else if (dir instanceof DetailRequestAssignment) {
120: Logging.defaultLogger().debug(
121: "Received DetailRequestAssignment");
122: processDetailRequestAssignment(
123: (DetailRequestAssignment) dir, changes);
124: } else if (dir instanceof QueryRequestAssignment) {
125: Logging.defaultLogger().debug(
126: "Received QueryRequestAssignment");
127: processQueryRequestAssignment((QueryRequestAssignment) dir,
128: changes);
129: } else if (dir instanceof QueryReplyAssignment) {
130: Logging.defaultLogger().debug(
131: "Received QueryReplyAssignment");
132: processQueryReplyAssignment((QueryReplyAssignment) dir,
133: changes);
134: }
135: }
136:
137: // RestartLogicProvider implementation
138:
139: /**
140: * Cluster restart handler. Resend all our DetailRequest
141: * and QueryRequests to the restarted agent.
142: **/
143: public void restart(final MessageAddress cid) {
144: UnaryPredicate pred = new UnaryPredicate() {
145: public boolean execute(Object o) {
146: if (o instanceof DetailRequest) {
147: DetailRequest ir = (DetailRequest) o;
148: MessageAddress dest = ir.getSourceCluster();
149: return RestartLogicProviderHelper.matchesRestart(
150: self, cid, dest);
151: }
152: return false;
153: }
154: };
155: Enumeration en = rootplan.searchBlackboard(pred);
156: while (en.hasMoreElements()) {
157: DetailRequest ir = (DetailRequest) en.nextElement();
158: processDetailRequestAdded(ir);
159: }
160:
161: UnaryPredicate queryPred = new UnaryPredicate() {
162: public boolean execute(Object o) {
163: if (o instanceof QueryRequest) {
164: QueryRequest ir = (QueryRequest) o;
165: MessageAddress dest = ir.getSourceCluster();
166: return RestartLogicProviderHelper.matchesRestart(
167: self, cid, dest);
168: }
169: return false;
170: }
171: };
172: Enumeration queryEnum = rootplan.searchBlackboard(queryPred);
173: while (queryEnum.hasMoreElements()) {
174: QueryRequest ir = (QueryRequest) queryEnum.nextElement();
175: processQueryRequestAdded(ir);
176: }
177: }
178:
179: /**
180: * Turn request into assignment.
181: * First step in the process. A request read from the blackboard
182: * is turned into an assignment, and sent to the agent where
183: * the object lives.
184: */
185: private void processDetailRequestAdded(DetailRequest dr) {
186: // First, check to see if we are already waiting for this object
187: UID uid = dr.getDetailUID();
188: if (outstandingRequests.containsValue(uid)) {
189: return;
190: }
191:
192: outstandingRequests.put(uid, uid);
193:
194: // create an DetailRequestAssignment directive
195: DetailRequestAssignment dra = glmFactory
196: .newDetailRequestAssignment(dr);
197: Logging.defaultLogger().debug(
198: "Sending DetailRequestAssignment to "
199: + dra.getDestination());
200: // Give the directive to the blackboard for tranmission
201: rootplan.sendDirective(dra);
202: }
203:
204: /**
205: * Turn request assignment into reply assignment.
206: * The second and third steps in the process.
207: * A request for an object is received on the agent where the object resides.
208: * The object is found, packaged, and sent back to agent where the request originated.
209: */
210: private void processDetailRequestAssignment(
211: DetailRequestAssignment ta, Collection changes) {
212: DetailRequest request = (DetailRequest) ta.getDetailRequest();
213: UID uid = request.getDetailUID();
214: Logging.defaultLogger().debug("UID: " + uid);
215: try {
216: UniqueObject uo = (UniqueObject) rootplan
217: .findUniqueObject(uid);
218: if (uo instanceof Transferable) {
219: // Clone so we don't have cross agent references to the same object
220: uo = (UniqueObject) ((Transferable) uo).clone();
221: } else {
222: Logging
223: .defaultLogger()
224: .warn(
225: uo
226: + " does not implement Transferable."
227: + " Fullfillment of the DetailRequest may result \n"
228: + " in cross agent references to the same object.");
229: }
230: DetailReplyAssignment dra = glmFactory
231: .newDetailReplyAssignment(uo, uid, self, request
232: .getRequestingCluster());
233:
234: Logging.defaultLogger().debug(
235: "DetailReplyAssignment " + dra);
236: Logging.defaultLogger()
237: .debug(
238: "DetailReplyAssignment UID: "
239: + dra.getRequestUID());
240: rootplan.sendDirective(dra);
241: } catch (RuntimeException excep) {
242: excep.printStackTrace();
243: }
244: }
245:
246: /**
247: * Publish the result of the request to the blackboard.
248: * The last step in the process. An answer has returned to the
249: * originating agent, and is published here.
250: */
251: private void processDetailReplyAssignment(
252: DetailReplyAssignment reply, Collection changes) {
253: UniqueObject obj = reply.getDetailObject();
254: final UID replyUID = reply.getRequestUID();
255: if (obj == null) {
256: Logging.defaultLogger().warn(
257: "Object not found on remote agent " + replyUID);
258: cleanup(replyUID);
259: return;
260: }
261:
262: UniqueObject existingObj = rootplan.findUniqueObject(obj
263: .getUID());
264: if (existingObj != null) {
265:
266: // Copy fields so the changes actually appear
267: if (obj instanceof Transferable) {
268: ((Transferable) existingObj).setAll((Transferable) obj);
269: } else {
270: Logging
271: .defaultLogger()
272: .warn(
273: existingObj
274: + " does not implement Transferable."
275: + " Changes to source will not be visible.");
276: }
277:
278: try {
279: rootplan.change(obj, changes);
280: } catch (RuntimeException re) {
281: re.printStackTrace();
282: }
283: } else {
284: try {
285: Logging.defaultLogger().debug(
286: "Publishing DetailReply " + reply
287: + obj.getUID());
288: rootplan.add(obj);
289: } catch (RuntimeException excep) {
290: excep.printStackTrace();
291: }
292: }
293: cleanup(replyUID);
294: }
295:
296: /**
297: * Removes DetailRequests from the blackboard. Also removes uids from
298: * the outstanding requests hash.
299: */
300: private void cleanup(final UID cleanupUID) {
301: // clear out Requests for this object
302: outstandingRequests.remove(cleanupUID);
303:
304: UnaryPredicate pred = new UnaryPredicate() {
305: public boolean execute(Object o) {
306: if (o instanceof DetailRequest) {
307: DetailRequest dr = (DetailRequest) o;
308: UID uid = dr.getDetailUID();
309: if (uid == null) {
310: Logging.defaultLogger().error(
311: "Null UID for " + dr);
312: return false;
313: }
314: if (cleanupUID == null) {
315: Logging.defaultLogger().error(
316: "Null cleanup UID");
317: return false;
318: }
319: if (uid.equals(cleanupUID))
320: return true;
321: }
322: return false;
323: }
324: };
325: Enumeration requests = rootplan.searchBlackboard(pred);
326:
327: while (requests.hasMoreElements()) {
328: DetailRequest dr = (DetailRequest) requests.nextElement();
329: Logging.defaultLogger().debug(
330: "Removing DetailRequest from blackboard: " + dr);
331: rootplan.remove(dr);
332: }
333: }
334:
335: /**
336: * Turn request into assignment.
337: * First step in the process. A request read from the blackboard
338: * is turned into an assignment, and sent to the agent where
339: * the object lives.
340: */
341: private void processQueryRequestAdded(QueryRequest qr) {
342: // First, check to see if we are already waiting for this object
343: UnaryPredicate pred = qr.getQueryPredicate();
344:
345: if (outstandingRequests.containsValue(pred)) {
346: Logging.defaultLogger().debug(
347: "Outstanding QueryRequestAssignment");
348: return;
349: }
350:
351: outstandingRequests.put(pred, pred);
352:
353: // create an QueryRequestAssignment directive
354: QueryRequestAssignment qra = glmFactory
355: .newQueryRequestAssignment(qr);
356: Logging.defaultLogger().debug(
357: "Sending QueryRequestAssignment to "
358: + qra.getDestination());
359: // Give the directive to the blackboard for tranmission
360: rootplan.sendDirective(qra);
361: }
362:
363: /**
364: * Turn request assignment into reply assignment.
365: * The second and third steps in the process.
366: * A request for a query is received on the agent.
367: * The query is executed and the results are packaged,
368: * and sent back to agent where the request originated.
369: */
370: private void processQueryRequestAssignment(
371: QueryRequestAssignment ta, Collection changes) {
372: QueryRequest request = (QueryRequest) ta.getQueryRequest();
373: UnaryPredicate pred = request.getQueryPredicate();
374: ArrayList collection;
375: Logging.defaultLogger().debug("QueryPredicate: " + pred);
376: try {
377: Enumeration e = rootplan.searchBlackboard(pred);
378: collection = new ArrayList(7);
379: while (e.hasMoreElements()) {
380: Object next = e.nextElement();
381:
382: if (next instanceof Transferable) {
383: // Clone so we don't end up with cross agent refs to the same object
384: next = ((Transferable) next).clone();
385: } else {
386: Logging
387: .defaultLogger()
388: .warn(
389: next
390: + " does not implement Transferable."
391: + " Fullfillment of the QueryRequest may result \n"
392: + " in cross agent references to the same object.");
393:
394: }
395: collection.add(next);
396: }
397:
398: QueryReplyAssignment dra = glmFactory
399: .newQueryReplyAssignment(collection, pred, request
400: .getLocalQueryPredicate(), self, request
401: .getRequestingCluster());
402:
403: Logging.defaultLogger()
404: .debug("QueryReplyAssignment " + dra);
405: Logging.defaultLogger().debug(
406: "QueryReplyAssignment Pred: "
407: + dra.getRequestPredicate());
408: Logging.defaultLogger().debug(
409: "QueryReplyAssignment requestor: "
410: + request.getRequestingCluster());
411: rootplan.sendDirective(dra);
412: } catch (RuntimeException excep) {
413: excep.printStackTrace();
414: }
415: }
416:
417: /**
418: * Publish the result of the query request to the blackboard.
419: * The last step in the process. An answer has returned to the
420: * originating agent, and is published here.
421: */
422: private void processQueryReplyAssignment(
423: QueryReplyAssignment reply, Collection changes) {
424: Collection replyCollection = reply.getQueryResponse();
425: final UnaryPredicate replyPredicate = reply
426: .getRequestPredicate();
427: if ((replyCollection == null) || replyCollection.isEmpty()) {
428: Logging.defaultLogger().debug(
429: "Query on remote agent returned no values "
430: + replyPredicate);
431: cleanup(replyPredicate);
432: return;
433: }
434:
435: //Compare reply collection with local collection.
436: ArrayList localCollection = new ArrayList();
437: if (reply.getLocalPredicate() != null) {
438: Enumeration localEnum = rootplan.searchBlackboard(reply
439: .getLocalPredicate());
440: while (localEnum.hasMoreElements()) {
441: localCollection.add(localEnum.nextElement());
442: }
443: }
444:
445: for (Iterator it = replyCollection.iterator(); it.hasNext();) {
446: Object obj = it.next();
447: if (obj instanceof UniqueObject) {
448: Object localObj = rootplan
449: .findUniqueObject(((UniqueObject) obj).getUID());
450:
451: if (localObj != null) {
452: // Only publish change if object is really different
453: if (!localObj.equals(obj)) {
454: if (obj instanceof Transferable) {
455: ((Transferable) localObj)
456: .setAll((Transferable) obj);
457: } else {
458: Logging
459: .defaultLogger()
460: .warn(
461: localObj
462: + " does not implement Transferable."
463: + " Changes will not be visible.");
464: }
465: rootplan.change(obj, changes);
466: } else {
467: Logging.defaultLogger().debug(
468: " not publish changing existing obj "
469: + obj);
470: }
471: } else {
472: Logging.defaultLogger().debug(
473: "Publishing QueryReply " + reply + obj);
474: rootplan.add(obj);
475: }
476: } else {
477: // Not unique
478: // Look for object match in local collection
479: if (findMatch(obj, localCollection) == null) {
480: rootplan.add(obj);
481: }
482: }
483: }
484:
485: //Remove local objects which are not in the reply collection
486: for (Iterator iterator = localCollection.iterator(); iterator
487: .hasNext();) {
488: Object localObj = iterator.next();
489: if (findMatch(localObj, replyCollection) == null) {
490: rootplan.remove(localObj);
491: }
492: }
493:
494: cleanup(replyPredicate);
495: }
496:
497: /**
498: * Removes QueryRequests from the blackboard. Also removes uids from
499: * the outstanding requests hash.
500: */
501: private void cleanup(final UnaryPredicate cleanupPred) {
502: // clear out Requests for this object
503: outstandingRequests.remove(cleanupPred);
504:
505: Enumeration requests = rootplan
506: .searchBlackboard(new UnaryPredicate() {
507: public boolean execute(Object o) {
508: if (o instanceof QueryRequest) {
509: QueryRequest dr = (QueryRequest) o;
510: UnaryPredicate pred = dr
511: .getQueryPredicate();
512: if (pred == null) {
513: Logging.defaultLogger().error(
514: "Predicate is null for " + dr);
515: return false;
516: }
517: if (cleanupPred == null) {
518: Logging
519: .defaultLogger()
520: .error(
521: "cleanup() called with a null predicate.");
522: return false;
523: }
524: if (pred.equals(cleanupPred))
525: return true;
526: }
527: return false;
528: }
529: });
530:
531: while (requests.hasMoreElements()) {
532: QueryRequest qr = (QueryRequest) requests.nextElement();
533: Logging.defaultLogger().debug(
534: "Removing QueryRequest from blackboard: " + qr);
535: rootplan.remove(qr);
536: }
537: }
538:
539: private Object findMatch(Object object, Collection collection) {
540:
541: if (object instanceof UniqueObject) {
542: UID uid = ((UniqueObject) object).getUID();
543:
544: for (Iterator iterator = collection.iterator(); iterator
545: .hasNext();) {
546: Object match = iterator.next();
547: if (match instanceof UniqueObject) {
548: if (((UniqueObject) match).getUID().equals(uid)) {
549: return match;
550: }
551: }
552: }
553: } else {
554: for (Iterator iterator = collection.iterator(); iterator
555: .hasNext();) {
556: Object match = iterator.next();
557: if (match.equals(object)) {
558: return match;
559: }
560: }
561: }
562:
563: return null;
564: }
565: }
|