001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.relay;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.Enumeration;
033: import java.util.HashMap;
034: import java.util.HashSet;
035: import java.util.Iterator;
036: import java.util.Map;
037: import java.util.Set;
038:
039: import org.cougaar.core.blackboard.ABATranslation;
040: import org.cougaar.core.blackboard.ChangeReport;
041: import org.cougaar.core.blackboard.Directive;
042: import org.cougaar.core.blackboard.EnvelopeTuple;
043: import org.cougaar.core.domain.ABAChangeLogicProvider;
044: import org.cougaar.core.domain.EnvelopeLogicProvider;
045: import org.cougaar.core.domain.LogicProvider;
046: import org.cougaar.core.domain.MessageLogicProvider;
047: import org.cougaar.core.domain.RestartLogicProvider;
048: import org.cougaar.core.domain.RootPlan;
049: import org.cougaar.core.mts.MessageAddress;
050: import org.cougaar.core.util.UID;
051: import org.cougaar.core.util.UniqueObject;
052: import org.cougaar.multicast.AttributeBasedAddress;
053: import org.cougaar.util.UnaryPredicate;
054: import org.cougaar.util.log.Logger;
055: import org.cougaar.util.log.LoggerFactory;
056:
057: /**
058: * A {@link LogicProvider} to transmit and update {@link Relay}
059: * objects.
060: *
061: * @see Relay
062: */
063: public class RelayLP implements LogicProvider, EnvelopeLogicProvider,
064: MessageLogicProvider, RestartLogicProvider,
065: ABAChangeLogicProvider {
066: private final RootPlan rootplan;
067: private final MessageAddress self;
068: private final Relay.Token token;
069:
070: private final Logger logger = LoggerFactory.getInstance()
071: .createLogger(getClass());
072:
073: public RelayLP(RootPlan rootplan, MessageAddress self) {
074: this .rootplan = rootplan;
075: this .self = self;
076: token = TokenImpl.getToken(self);
077: }
078:
079: public void init() {
080: }
081:
082: // EnvelopeLogicProvider implementation
083: /**
084: * Sends the Content of Relay sources to the their targets and sends
085: * target responses back to the source.
086: * @param o an EnvelopeTuple where the tuple.object is
087: * a Relay.Source or Relay.Target
088: */
089: public void execute(EnvelopeTuple o, Collection changes) {
090: Object obj = o.getObject();
091: if (obj instanceof Relay) { // Quick test for Target or Source
092: if (changes != null
093: && changes.contains(MarkerReport.INSTANCE)) {
094: // Ignore changes containing our MarkerReport
095: // This avoids looping
096: return;
097: }
098: if (obj instanceof Relay.Target) {
099: Relay.Target rt = (Relay.Target) obj;
100: // Only changes are significant at a Target
101: // The target is sending a response back to the source
102: if (o.isChange()) {
103: localResponse(rt, changes);
104: }
105: }
106:
107: // Note no else -- so something both a Target and a Relay
108: // will run through all of these
109:
110: if (obj instanceof Relay.Source) {
111: Relay.Source rs = (Relay.Source) obj;
112: if (o.isAdd()) {
113: // New relay to be sent to targets.
114: // Note that a Relay.Target just published at Dest that's also
115: // a Relay.Source would get in here -- so must have the MarkerReport
116: localAdd(rs);
117: } else if (o.isChange()) {
118: // New relay content or targets list
119: // Note that a Relay.Target just changed at Dest that's also
120: // a Relay.Source would get in here -- so must have the MarkerReport
121: localChange(rs, changes);
122: } else if (o.isRemove()) {
123: // Remove the relay from the Targets
124: // Note that a Relay.Target just changed at Dest that's also
125: // a Relay.Source would get in here -- so must have the MarkerReport
126: localRemove(rs);
127: }
128: }
129: }
130: }
131:
132: // New Relay.Source added. Only called from LP.execute()
133: private void localAdd(Relay.Source rs) {
134: Set targets = rs.getTargets();
135: if (targets == null)
136: return;
137: if (targets.isEmpty())
138: return; // No targets
139: localAdd(rs, targets);
140: }
141:
142: // Propogate the new Relay to each listed target
143: // Called from above localAdd and from abaChange when an aba expands
144: // to new targets.
145: private void localAdd(Relay.Source rs, Set targets) {
146: // If this were also a target, we could check that this agent
147: // is the source. That might help break looping
148: boolean gotContent = false;
149: Object content = null;
150: for (Iterator i = targets.iterator(); i.hasNext();) {
151: MessageAddress target = (MessageAddress) i.next();
152: if (target == null) {
153: // Ignore nulls.
154: } else if (target.getPrimary().equals(self)) {
155: // Never send to self. Likely an error.
156: } else {
157: if (!gotContent) {
158: gotContent = true;
159: content = rs.getContent();
160: }
161: sendAdd(rs, target, content);
162: }
163: }
164: }
165:
166: /**
167: * Handle a change to this source. We need to send the new content
168: * to the targets.
169: */
170: private void localChange(Relay.Source rs, Collection changes) {
171: // called from changeTarget, receiveResponse, and LP.execute
172: Set targets = rs.getTargets();
173: Collection oldTargets = null;
174: // Get the oldtargets mentioned in the _first_ RelayChangeReport
175: // (if there are many, later ones are ignored)
176: if (changes != null) {
177: for (Iterator i = changes.iterator(); i.hasNext();) {
178: Object o = i.next();
179: if (o instanceof RelayChangeReport) {
180: if (oldTargets == null) {
181: RelayChangeReport rcr = (RelayChangeReport) o;
182: oldTargets = rcr.getOldTargets();
183: }
184: i.remove();
185: }
186: }
187: }
188:
189: // If we got targets from a ChangeReport above, winnow that
190: // down to targets no longer in the targets list.
191: // Tell each such agent to remove this Relay
192: if (oldTargets != null) {
193: if (targets != null)
194: oldTargets.removeAll(targets);
195: UID uid = rs.getUID();
196: for (Iterator i = oldTargets.iterator(); i.hasNext();) {
197: MessageAddress target = (MessageAddress) i.next();
198: sendRemove(uid, target);
199: }
200: }
201: if (targets == null || targets.isEmpty()) {
202: return; // No targets
203: }
204:
205: // FIXME check for targets-change-report:
206: // calculate set differences
207: // for added targets: sendAdd
208: // for removed targets: sendRemove
209: // add ContentReport to changes
210: boolean gotContent = false;
211: Object content = null;
212: for (Iterator i = targets.iterator(); i.hasNext();) {
213: MessageAddress target = (MessageAddress) i.next();
214: if (target == null) {
215: // Ignore nulls.
216: } else if (target.getPrimary().equals(self)) {
217: // Never send to self. Likely an error.
218: } else {
219: if (!gotContent) {
220: gotContent = true;
221: content = rs.getContent();
222: }
223: // This target could be an ABA that includes this agent, right?
224: sendChange(rs, target, content, changes);
225: }
226: }
227: }
228:
229: // Local Relay.Source was publishRemoved
230: // Called from lp.execute
231: private void localRemove(Relay.Source rs) {
232: Set targets = rs.getTargets();
233: if (targets == null)
234: return;
235: if (targets.isEmpty())
236: return; // No targets
237: // Again, if this is also a Relay.Target, could check that this is
238: // really the source
239: localRemove(rs.getUID(), targets);
240: }
241:
242: // Propogate removal of relay to each target
243: // called from above, ie lp.execute, and from abaChange
244: private void localRemove(UID uid, Set targets) {
245: for (Iterator i = targets.iterator(); i.hasNext();) {
246: MessageAddress target = (MessageAddress) i.next();
247: if (target == null) {
248: // Ignore nulls.
249: } else if (target.getPrimary().equals(self)) {
250: // Never send to self. Likely an error.
251: } else {
252: // Again, what if the target is an ABA that includes this agent?
253: sendRemove(uid, target);
254: }
255: }
256: }
257:
258: /**
259: * Handle a change to this target. We need to send the new response
260: * to the source
261: */
262: private void localResponse(Relay.Target rt, Collection changes) {
263: // called from changeTarget, receiveResponse, LP.execute
264: MessageAddress source = rt.getSource();
265: if (source == null)
266: return; // No source
267: if (self.equals(source.getPrimary()))
268: return; // BOGUS source must be elsewhere. Ignore.
269:
270: Object resp = rt.getResponse();
271: // cancel if response is null
272: if (resp == null)
273: return;
274:
275: sendResponse(rt, source, resp, changes);
276: }
277:
278: // Send directive to given target Agent to add this Relay
279: // called from localAdd and resend
280: private void sendAdd(Relay.Source rs, MessageAddress target,
281: Object content) {
282: RelayDirective.Add dir = new RelayDirective.Add(rs.getUID(),
283: content, rs.getTargetFactory());
284: dir.setSource(self);
285: dir.setDestination(target);
286: rootplan.sendDirective(dir);
287: }
288:
289: // Send directive to given target Agent of change to this Relay
290: // called from localChange
291: private void sendChange(Relay.Source rs, MessageAddress target,
292: Object content, Collection c) {
293: RelayDirective.Change dir = new RelayDirective.Change(rs
294: .getUID(), content, rs.getTargetFactory());
295: dir.setSource(self);
296: dir.setDestination(target);
297: rootplan.sendDirective(dir, c);
298: }
299:
300: // Send directive to given target agent to remove this Relay
301: // called from localChange, localRemove, receiveResponse
302: private void sendRemove(UID uid, MessageAddress target) {
303: RelayDirective.Remove dir = new RelayDirective.Remove(uid);
304: dir.setSource(self);
305: dir.setDestination(target);
306: rootplan.sendDirective(dir);
307: }
308:
309: // Send directive back to the Source of Response from this Target
310: // called from sendVerification, addTarget, localResponse
311: private void sendResponse(Relay.Target rt, MessageAddress source,
312: Object resp, Collection c) {
313: RelayDirective.Response dir = new RelayDirective.Response(rt
314: .getUID(), resp);
315: dir.setSource(self);
316: dir.setDestination(source);
317: rootplan.sendDirective(dir, c);
318: }
319:
320: // Resend latest (possibly null) response from this target to the source
321: // called from verify
322: private void sendVerification(Relay.Target rt, MessageAddress source) {
323: Object resp = rt.getResponse();
324: // Send even if null response
325: sendResponse(rt, source, resp, Collections.EMPTY_SET);
326: }
327:
328: // MessageLogicProvider implementation
329: public void execute(Directive dir, Collection changes) {
330: if (dir instanceof RelayDirective) {
331: // Quick test for one of ours
332: if (self.equals(dir.getSource().getPrimary()))
333: return;
334:
335: if (dir instanceof RelayDirective.Change) {
336: receiveChange((RelayDirective.Change) dir, changes);
337: return;
338: }
339: if (dir instanceof RelayDirective.Add) {
340: receiveAdd((RelayDirective.Add) dir);
341: return;
342: }
343: if (dir instanceof RelayDirective.Remove) {
344: receiveRemove((RelayDirective.Remove) dir);
345: return;
346: }
347: if (dir instanceof RelayDirective.Response) {
348: receiveResponse((RelayDirective.Response) dir, changes);
349: return;
350: }
351: }
352: }
353:
354: // called from receiveAdd and receiveChange
355: // In the target agent, add the Relay.Target (which may also implement Relay.Source)
356: private void addTarget(Relay.TargetFactory tf, Object cont,
357: RelayDirective dir) {
358: Relay.Target rt;
359: if (tf != null) {
360: rt = tf.create(dir.getUID(), dir.getSource(), cont, token);
361: } else if (cont instanceof Relay.Target) {
362: rt = (Relay.Target) cont;
363: } else {
364: // ERROR cannot create target
365: return;
366: }
367: if (rt == null)
368: return; // Target should not exist here
369: // Add the target. Note that if it is also a source,
370: // this LP will wake up again, and try to send the relay
371: // to all the targets.
372: /// FIXME: This is a place to block relaying. Add a Marker report?
373: rootplan.add(rt);
374: // Check for immediate response due to arrival
375: Object resp = rt.getResponse();
376: if (resp != null) {
377: sendResponse(rt, dir.getSource(), resp,
378: Collections.EMPTY_SET);
379: }
380: }
381:
382: // called from receiveAdd and receiveChange
383: private void changeTarget(Relay.Target rt, Object cont,
384: Collection changes) {
385: // Branch on the change type flag.
386: // If the content changed, then mark the taret as changed,
387: // but in such a way that this LP won't run again
388: int flags = rt.updateContent(cont, token);
389: if ((flags & Relay.CONTENT_CHANGE) != 0) {
390: Collection c;
391: if (changes == null) {
392: c = Collections.singleton(MarkerReport.INSTANCE);
393: } else {
394: c = new ArrayList(changes);
395: c.add(MarkerReport.INSTANCE);
396: }
397: // Note the MarkerReport is on this change,
398: // so the LP will not think the Response changed
399: // and needs to flow back
400: rootplan.change(rt, c);
401: // FIXME: What is this for?!!
402: // Note I made localChange bail if this is not the Source
403: // Presumably this is for chaining. It means that if a content
404: // change comes in to this agent, and the local Target is also
405: // a source, we can let this LP pretend the change
406: // was local, and propogate it to the listed targets
407: // FIXME!!
408: if (rt instanceof Relay.Source)
409: localChange((Relay.Source) rt, changes);
410: }
411:
412: // If we (also) changed the response on the relay,
413: // send that reponse to the source if possible
414: // -- but I don't see how or why an incoming directive would say that
415: if ((flags & Relay.RESPONSE_CHANGE) != 0) {
416: // Note localResponse does nothing if this is the source (correctly)
417: localResponse(rt, Collections.EMPTY_SET);
418: }
419: }
420:
421: // called from lp.execute when get an incoming add directive
422: private void receiveAdd(RelayDirective.Add dir) {
423: UniqueObject uo = rootplan.findUniqueObject(dir.getUID());
424: if (!(uo instanceof Relay.Target) && uo != null) {
425: logger
426: .error(self
427: + ".receiveAdd RelayDirective.Add expected to find a Target on the BBoard, found: "
428: + uo + " for Directive " + dir
429: + ", source " + dir.getSource());
430: return;
431: }
432: Relay.Target rt = (Relay.Target) uo;
433: if (rt == null) {
434: addTarget(dir.getTargetFactory(), dir.getContent(), dir);
435: } else {
436: // Unusual. Treat as change
437: changeTarget(rt, dir.getContent(), Collections.EMPTY_SET);
438: }
439: }
440:
441: // Receive a change from remote Source at this Target
442: // called only from incoming directive to lp.execute
443: private void receiveChange(RelayDirective.Change dir,
444: Collection changes) {
445: Relay.Target rt = (Relay.Target) rootplan.findUniqueObject(dir
446: .getUID());
447: if (rt == null) {
448: // Unusual. Treat as add.
449: addTarget(dir.getTargetFactory(), dir.getContent(), dir);
450: } else {
451: // What if this is the source?
452: changeTarget(rt, dir.getContent(), changes);
453: }
454: }
455:
456: // called only from lp.execute when get a directive to remove this relay
457: private void receiveRemove(RelayDirective.Remove dir) {
458: Relay.Target rt = (Relay.Target) rootplan.findUniqueObject(dir
459: .getUID());
460: if (rt == null) {
461: // Unusual. Ignore.
462: } else {
463: rootplan.remove(rt);
464: }
465: }
466:
467: // called only from lp.execute
468: private void receiveResponse(RelayDirective.Response dir,
469: Collection changes) {
470: UniqueObject uo = rootplan.findUniqueObject(dir.getUID());
471: MessageAddress target = dir.getSource();
472: if (!(uo instanceof Relay.Source) && uo != null) {
473: // This is not legitimate. We'll get a ClassCastException below
474: // if we're not careful
475: logger
476: .error(
477: self
478: + ": receiveResponse got non Relay.Source (Bug 3202?). Got: "
479: + uo + " from the Response["
480: + dir.getUID() + "] with source "
481: + target + " and dest "
482: + dir.getDestination()
483: + ", response " + dir.getResponse(),
484: new Throwable());
485: return;
486: }
487: Relay.Source rs = (Relay.Source) uo;
488: // Relay.Source rs = (Relay.Source) rootplan.findUniqueObject(dir.getUID());
489: if (rs == null) {
490: // No longer part of our blackboard. Rescind it.
491: if (logger.isInfoEnabled())
492: logger
493: .info(self
494: + ": receiveResponse got NULL Relay.Source from the Response["
495: + dir.getUID() + "] with source "
496: + target + " and dest "
497: + dir.getDestination() + ", response "
498: + dir.getResponse());
499:
500: sendRemove(dir.getUID(), target);
501: } else {
502: Object resp = dir.getResponse();
503: if (resp != null) {
504: // Have a response. If the response changed, must locally
505: // publishChange the relay, but don't loop and resend the relay.
506: int flags = rs.updateResponse(target, resp);
507: if ((flags & Relay.RESPONSE_CHANGE) != 0) {
508: Collection c;
509: if (changes == null) {
510: c = new ArrayList(1);
511: } else {
512: c = new ArrayList(changes);
513: }
514: c.add(MarkerReport.INSTANCE);
515: // FIXME: Must I require that this Relay.Source in fact originated
516: // on the local agent before doing a publishChange?
517: // FIXME: Should dir.getDestination().getPrimary().equals(self)?
518: // And if this is also a target, should its source be this
519: // agent, or not necessarily? If I was trying to chain Relays,
520: // then The source field on a relay.target need not be the place
521: // that originated the relay.source implementation - which
522: // will be local... I'm confused
523: rootplan.change(rs, c);
524:
525: // Note that localResponse will do nothing
526: // if this is the Source for this target.
527: // This says that a downstream target
528: // told us they changed their response. If this is downstream
529: // of someone else, then send the response further upstream
530: if (rs instanceof Relay.Target)
531: localResponse((Relay.Target) rs, changes);
532: }
533:
534: // If (also) the content of the relay is different (from the source)
535: // then this lets us send the changes downstream maybe? But
536: // we just got the info from downstream?
537: // Or is this to check the targets list?
538: if ((flags & Relay.CONTENT_CHANGE) != 0) {
539: // localChange requires that this is the Source as well
540: localChange(rs, Collections.EMPTY_SET);
541: }
542: }
543: }
544: }
545:
546: // RestartLogicProvider implementation
547:
548: /**
549: * Agent restart handler. Resend all our Relay.Source again and
550: * send verification directives for all our Relay.Targets.
551: */
552: public void restart(final MessageAddress cid) {
553: if (logger.isInfoEnabled()) {
554: logger.info(self + ": Reconcile with "
555: + (cid == null ? "all agents" : cid.toString()));
556: }
557: UnaryPredicate pred = new UnaryPredicate() {
558: public boolean execute(Object o) {
559: return o instanceof Relay;
560: }
561: };
562:
563: // Loop over all Relays on the Blackboard
564: Enumeration en = rootplan.searchBlackboard(pred);
565: while (en.hasMoreElements()) {
566: Relay r = (Relay) en.nextElement();
567: // Resend all Relay.Sources
568: if (r instanceof Relay.Source) {
569: Relay.Source rs = (Relay.Source) r;
570: // What if it's also a target?
571: resend(rs, cid);
572: }
573:
574: // And for Targets, send back a verify to the source
575: if (r instanceof Relay.Target) {
576: Relay.Target rt = (Relay.Target) r;
577: verify(rt, cid);
578: }
579: }
580: if (logger.isInfoEnabled()) {
581: logger.info(self + ": Reconciled");
582: }
583: }
584:
585: // When someone is restarting, resend all Relays
586: private void resend(Relay.Source rs, MessageAddress t) {
587: Set targets = rs.getTargets();
588: if (targets == null)
589: return; // Not really a source
590: if (targets.isEmpty())
591: return;
592:
593: boolean gotContent = false; // Only grab the content once
594: Object content = null;
595:
596: // For each target
597: for (Iterator i = targets.iterator(); i.hasNext();) {
598: MessageAddress target = (MessageAddress) i.next();
599: if (target == null) {
600: // Ignore nulls.
601: } else if (target.getPrimary().equals(self)) {
602: // Don't send to ourself. Likely an error.
603: } else if (t != null
604: && !target.getPrimary().equals(t.getPrimary())) {
605: // Only resend to the specified address.
606: } else {
607: if (!gotContent) {
608: gotContent = true;
609: content = rs.getContent();
610: }
611: if (logger.isInfoEnabled()) {
612: logger.info(self + ": Resend"
613: + (t == null ? "*" : "") + " to " + target
614: + ": " + rs.getUID());
615: }
616:
617: // FIXME: Check that we're not sending to an ABA that includes this agent?
618:
619: // Caller ensures that Relay.Sources here
620: // really originated here
621: // Re-send that Relay as though it were new
622: sendAdd(rs, target, content);
623: }
624: }
625: }
626:
627: // Given address is restarting (or null). If it's the source
628: // of the given relay or null and the relay didn't start here,
629: // then send a verification
630: private void verify(Relay.Target rt, MessageAddress s) {
631: MessageAddress source = rt.getSource();
632: if (source == null)
633: return;
634: if (source.getPrimary().equals(self)) {
635: // Don't send to ourself. Likely an error.
636: return;
637: } else {
638:
639: // Sends a verification back to the source
640: // if the given address is null or the source address,
641: // ie if the source restarted or we did
642: if (s == null || source.getPrimary().equals(s.getPrimary())) {
643: if (logger.isInfoEnabled()) {
644: logger.info(self + ": Verify"
645: + (s == null ? "*" : "") + " to " + source
646: + ": " + rt.getUID());
647: }
648: sendVerification(rt, source);
649: }
650: }
651: }
652:
653: // ABAChange implementation
654: private static final UnaryPredicate relaySourcePred = new UnaryPredicate() {
655: public boolean execute(Object o) {
656: // FIXME: Somehow require it really is a source from here?
657: return o instanceof Relay.Source;
658: }
659: };
660:
661: // Implement ABAChangeLogicProvider.
662:
663: // Can get called from DomainAdapter.invokeABAChangeLogicProviders and from RootDomain.invokeABAChangeLogicProviders
664: // Distributor/Blackboard does one call - on the DomainService (ie DomainManager)
665: // DomainManager loops over the domains (so RootDomain)
666: // RootDomain is just an alternate implementation to DomainAdapter
667: // So really this is all happening from the cacheClearer thread in the
668: // Blackboard, inside some locks in the Distributor
669:
670: // Basically, this means that some ABA memberships (may have?) changed
671: // So we need to go through all Relay sources, look at the
672: // target lists, and if one is an ABA, get the translation,
673: // figure out what additions or removals there are. Send
674: // those adds/removes as necessary.
675:
676: // Note that when the Relay is initially published,
677: // no effort is made to translate the ABA
678:
679: // If we didn't change that the source really started here,
680: // then we'd get some relays that didn't start here and all targets
681: // of the relay would each try to send a remove/add to the
682: // changed members - duplicative at least. The only other
683: // way around this is if the targets had an empty targets list (via
684: // making it transient or a clever target factory).
685: public void abaChange(Set communities) {
686: if (logger.isDebugEnabled())
687: logger.debug(self + ": abaChange");
688: Enumeration en = rootplan.searchBlackboard(relaySourcePred);
689: while (en.hasMoreElements()) {
690: Relay.Source rs = (Relay.Source) en.nextElement();
691: Set targets = rs.getTargets();
692: if (targets != null && !targets.isEmpty()) {
693: Set oldTranslation = Collections.EMPTY_SET;
694: Set newTranslation = Collections.EMPTY_SET;
695: for (Iterator i = targets.iterator(); i.hasNext();) {
696: Object o = i.next();
697: if (o instanceof AttributeBasedAddress) {
698: AttributeBasedAddress aba = (AttributeBasedAddress) o;
699: if (communities
700: .contains(aba.getCommunityName())) {
701: ABATranslation abaTranslation = rootplan
702: .getABATranslation(aba);
703: if (abaTranslation != null) {
704: Collection oldC = abaTranslation
705: .getOldTranslation();
706: if (oldC != null && !oldC.isEmpty()) {
707: if (oldTranslation.isEmpty()) {
708: oldTranslation = new HashSet();
709: }
710: oldTranslation.addAll(oldC);
711: }
712: Collection newC = abaTranslation
713: .getCurrentTranslation();
714: if (newC != null && !newC.isEmpty()) {
715: if (newTranslation.isEmpty()) {
716: newTranslation = new HashSet();
717: }
718: newTranslation.addAll(newC);
719: }
720: }
721: }
722: }
723: }
724: if (!newTranslation.equals(oldTranslation)) {
725: Set adds = new HashSet(newTranslation);
726: Set removes = new HashSet(oldTranslation);
727: adds.removeAll(oldTranslation);
728: removes.removeAll(newTranslation);
729: boolean isNOP = adds.isEmpty() && removes.isEmpty();
730: if (isNOP && logger.isDebugEnabled()) {
731: logger.debug("old " + oldTranslation);
732: logger.debug("new " + newTranslation);
733: logger.debug("Rmv " + removes + " from " + rs);
734: logger.debug("Add " + adds + " to " + rs);
735: }
736: if (!isNOP && logger.isInfoEnabled()) {
737: logger.info("old " + oldTranslation);
738: logger.info("new " + newTranslation);
739: logger.info("Rmv " + removes + " from " + rs);
740: logger.info("Add " + adds + " to " + rs);
741: }
742: if (!removes.isEmpty()) {
743: localRemove(rs.getUID(), removes);
744: }
745: if (!adds.isEmpty()) {
746: localAdd(rs, adds);
747: }
748: }
749: }
750: }
751: }
752:
753: /**
754: * ChangeReport for this LP to identify its own changes.
755: */
756: private static final class MarkerReport implements ChangeReport {
757: public static final MarkerReport INSTANCE = new MarkerReport();
758:
759: private MarkerReport() {
760: }
761:
762: private Object readResolve() {
763: return INSTANCE;
764: }
765:
766: public String toString() {
767: return "relay-marker-report";
768: }
769:
770: static final long serialVersionUID = 9091843781928322223L;
771: }
772:
773: /**
774: * Token implementation, private to RelayLP.
775: * <p>
776: * Keeps a map of (agent->token), which allows rehydrated
777: * relay objects to use "==" token matching.
778: */
779: private static final class TokenImpl extends Relay.Token {
780: private static final Map tokens = new HashMap(13);
781: private final MessageAddress addr;
782:
783: public static TokenImpl getToken(MessageAddress addr) {
784: synchronized (tokens) {
785: TokenImpl t = (TokenImpl) tokens.get(addr);
786: if (t == null) {
787: t = new TokenImpl(addr);
788: tokens.put(addr, t);
789: }
790: return t;
791: }
792: }
793:
794: private TokenImpl(MessageAddress addr) {
795: this .addr = addr;
796: }
797:
798: private Object readResolve() {
799: return getToken(addr);
800: }
801:
802: public String toString() {
803: return "<token " + addr + ">";
804: }
805:
806: static final long serialVersionUID = 3878912876728718092L;
807: }
808: }
|