Source Code Cross Referenced for RelayLP.java in  » Science » Cougaar12_4 » org » cougaar » core » relay » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.relay 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 1997-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:
027:        package org.cougaar.core.relay;
028:
029:        import java.util.ArrayList;
030:        import java.util.Collection;
031:        import java.util.Collections;
032:        import java.util.Enumeration;
033:        import java.util.HashMap;
034:        import java.util.HashSet;
035:        import java.util.Iterator;
036:        import java.util.Map;
037:        import java.util.Set;
038:
039:        import org.cougaar.core.blackboard.ABATranslation;
040:        import org.cougaar.core.blackboard.ChangeReport;
041:        import org.cougaar.core.blackboard.Directive;
042:        import org.cougaar.core.blackboard.EnvelopeTuple;
043:        import org.cougaar.core.domain.ABAChangeLogicProvider;
044:        import org.cougaar.core.domain.EnvelopeLogicProvider;
045:        import org.cougaar.core.domain.LogicProvider;
046:        import org.cougaar.core.domain.MessageLogicProvider;
047:        import org.cougaar.core.domain.RestartLogicProvider;
048:        import org.cougaar.core.domain.RootPlan;
049:        import org.cougaar.core.mts.MessageAddress;
050:        import org.cougaar.core.util.UID;
051:        import org.cougaar.core.util.UniqueObject;
052:        import org.cougaar.multicast.AttributeBasedAddress;
053:        import org.cougaar.util.UnaryPredicate;
054:        import org.cougaar.util.log.Logger;
055:        import org.cougaar.util.log.LoggerFactory;
056:
057:        /**
058:         * A {@link LogicProvider} to transmit and update {@link Relay}
059:         * objects.
060:         *
061:         * @see Relay
062:         */
063:        public class RelayLP implements  LogicProvider, EnvelopeLogicProvider,
064:                MessageLogicProvider, RestartLogicProvider,
065:                ABAChangeLogicProvider {
066:            private final RootPlan rootplan;
067:            private final MessageAddress self;
068:            private final Relay.Token token;
069:
070:            private final Logger logger = LoggerFactory.getInstance()
071:                    .createLogger(getClass());
072:
073:            public RelayLP(RootPlan rootplan, MessageAddress self) {
074:                this .rootplan = rootplan;
075:                this .self = self;
076:                token = TokenImpl.getToken(self);
077:            }
078:
079:            public void init() {
080:            }
081:
082:            // EnvelopeLogicProvider implementation
083:            /**
084:             * Sends the Content of Relay sources to the their targets and sends
085:             * target responses back to the source.
086:             * @param o an EnvelopeTuple where the tuple.object is
087:             *    a Relay.Source or Relay.Target
088:             */
089:            public void execute(EnvelopeTuple o, Collection changes) {
090:                Object obj = o.getObject();
091:                if (obj instanceof  Relay) { // Quick test for Target or Source
092:                    if (changes != null
093:                            && changes.contains(MarkerReport.INSTANCE)) {
094:                        // Ignore changes containing our MarkerReport
095:                        // This avoids looping
096:                        return;
097:                    }
098:                    if (obj instanceof  Relay.Target) {
099:                        Relay.Target rt = (Relay.Target) obj;
100:                        // Only changes are significant at a Target
101:                        // The target is sending a response back to the source
102:                        if (o.isChange()) {
103:                            localResponse(rt, changes);
104:                        }
105:                    }
106:
107:                    // Note no else -- so something both a Target and a Relay
108:                    // will run through all of these
109:
110:                    if (obj instanceof  Relay.Source) {
111:                        Relay.Source rs = (Relay.Source) obj;
112:                        if (o.isAdd()) {
113:                            // New relay to be sent to targets.
114:                            // Note that a Relay.Target just published at Dest that's also
115:                            // a Relay.Source would get in here -- so must have the MarkerReport
116:                            localAdd(rs);
117:                        } else if (o.isChange()) {
118:                            // New relay content or targets list
119:                            // Note that a Relay.Target just changed at Dest that's also
120:                            // a Relay.Source would get in here -- so must have the MarkerReport
121:                            localChange(rs, changes);
122:                        } else if (o.isRemove()) {
123:                            // Remove the relay from the Targets
124:                            // Note that a Relay.Target just changed at Dest that's also
125:                            // a Relay.Source would get in here -- so must have the MarkerReport
126:                            localRemove(rs);
127:                        }
128:                    }
129:                }
130:            }
131:
132:            // New Relay.Source added. Only called from LP.execute()
133:            private void localAdd(Relay.Source rs) {
134:                Set targets = rs.getTargets();
135:                if (targets == null)
136:                    return;
137:                if (targets.isEmpty())
138:                    return; // No targets
139:                localAdd(rs, targets);
140:            }
141:
142:            // Propogate the new Relay to each listed target
143:            // Called from above localAdd and from abaChange when an aba expands
144:            // to new targets.
145:            private void localAdd(Relay.Source rs, Set targets) {
146:                // If this were also a target, we could check that this agent
147:                // is the source. That might help break looping
148:                boolean gotContent = false;
149:                Object content = null;
150:                for (Iterator i = targets.iterator(); i.hasNext();) {
151:                    MessageAddress target = (MessageAddress) i.next();
152:                    if (target == null) {
153:                        // Ignore nulls.
154:                    } else if (target.getPrimary().equals(self)) {
155:                        // Never send to self.  Likely an error.
156:                    } else {
157:                        if (!gotContent) {
158:                            gotContent = true;
159:                            content = rs.getContent();
160:                        }
161:                        sendAdd(rs, target, content);
162:                    }
163:                }
164:            }
165:
166:            /**
167:             * Handle a change to this source. We need to send the new content
168:             * to the targets.
169:             */
170:            private void localChange(Relay.Source rs, Collection changes) {
171:                // called from changeTarget, receiveResponse, and LP.execute
172:                Set targets = rs.getTargets();
173:                Collection oldTargets = null;
174:                // Get the oldtargets mentioned in the _first_ RelayChangeReport 
175:                // (if there are many, later ones are ignored)
176:                if (changes != null) {
177:                    for (Iterator i = changes.iterator(); i.hasNext();) {
178:                        Object o = i.next();
179:                        if (o instanceof  RelayChangeReport) {
180:                            if (oldTargets == null) {
181:                                RelayChangeReport rcr = (RelayChangeReport) o;
182:                                oldTargets = rcr.getOldTargets();
183:                            }
184:                            i.remove();
185:                        }
186:                    }
187:                }
188:
189:                // If we got targets from a ChangeReport above, winnow that
190:                // down to targets no longer in the targets list.
191:                // Tell each such agent to remove this Relay
192:                if (oldTargets != null) {
193:                    if (targets != null)
194:                        oldTargets.removeAll(targets);
195:                    UID uid = rs.getUID();
196:                    for (Iterator i = oldTargets.iterator(); i.hasNext();) {
197:                        MessageAddress target = (MessageAddress) i.next();
198:                        sendRemove(uid, target);
199:                    }
200:                }
201:                if (targets == null || targets.isEmpty()) {
202:                    return; // No targets
203:                }
204:
205:                // FIXME check for targets-change-report:
206:                //   calculate set differences
207:                //   for added targets: sendAdd
208:                //   for removed targets: sendRemove
209:                // add ContentReport to changes
210:                boolean gotContent = false;
211:                Object content = null;
212:                for (Iterator i = targets.iterator(); i.hasNext();) {
213:                    MessageAddress target = (MessageAddress) i.next();
214:                    if (target == null) {
215:                        // Ignore nulls.
216:                    } else if (target.getPrimary().equals(self)) {
217:                        // Never send to self.  Likely an error.
218:                    } else {
219:                        if (!gotContent) {
220:                            gotContent = true;
221:                            content = rs.getContent();
222:                        }
223:                        // This target could be an ABA that includes this agent, right?
224:                        sendChange(rs, target, content, changes);
225:                    }
226:                }
227:            }
228:
229:            // Local Relay.Source was publishRemoved
230:            // Called from lp.execute
231:            private void localRemove(Relay.Source rs) {
232:                Set targets = rs.getTargets();
233:                if (targets == null)
234:                    return;
235:                if (targets.isEmpty())
236:                    return; // No targets
237:                // Again, if this is also a Relay.Target, could check that this is 
238:                // really the source
239:                localRemove(rs.getUID(), targets);
240:            }
241:
242:            // Propogate removal of relay to each target
243:            // called from above, ie lp.execute, and from abaChange
244:            private void localRemove(UID uid, Set targets) {
245:                for (Iterator i = targets.iterator(); i.hasNext();) {
246:                    MessageAddress target = (MessageAddress) i.next();
247:                    if (target == null) {
248:                        // Ignore nulls.
249:                    } else if (target.getPrimary().equals(self)) {
250:                        // Never send to self.  Likely an error.
251:                    } else {
252:                        // Again, what if the target is an ABA that includes this agent?
253:                        sendRemove(uid, target);
254:                    }
255:                }
256:            }
257:
258:            /**
259:             * Handle a change to this target. We need to send the new response
260:             * to the source
261:             */
262:            private void localResponse(Relay.Target rt, Collection changes) {
263:                // called from changeTarget, receiveResponse, LP.execute
264:                MessageAddress source = rt.getSource();
265:                if (source == null)
266:                    return; // No source
267:                if (self.equals(source.getPrimary()))
268:                    return; // BOGUS source must be elsewhere. Ignore.
269:
270:                Object resp = rt.getResponse();
271:                // cancel if response is null
272:                if (resp == null)
273:                    return;
274:
275:                sendResponse(rt, source, resp, changes);
276:            }
277:
278:            // Send directive to given target Agent to add this Relay
279:            // called from localAdd and resend
280:            private void sendAdd(Relay.Source rs, MessageAddress target,
281:                    Object content) {
282:                RelayDirective.Add dir = new RelayDirective.Add(rs.getUID(),
283:                        content, rs.getTargetFactory());
284:                dir.setSource(self);
285:                dir.setDestination(target);
286:                rootplan.sendDirective(dir);
287:            }
288:
289:            // Send directive to given target Agent of change to this Relay
290:            // called from localChange
291:            private void sendChange(Relay.Source rs, MessageAddress target,
292:                    Object content, Collection c) {
293:                RelayDirective.Change dir = new RelayDirective.Change(rs
294:                        .getUID(), content, rs.getTargetFactory());
295:                dir.setSource(self);
296:                dir.setDestination(target);
297:                rootplan.sendDirective(dir, c);
298:            }
299:
300:            // Send directive to given target agent to remove this Relay
301:            // called from localChange, localRemove, receiveResponse 
302:            private void sendRemove(UID uid, MessageAddress target) {
303:                RelayDirective.Remove dir = new RelayDirective.Remove(uid);
304:                dir.setSource(self);
305:                dir.setDestination(target);
306:                rootplan.sendDirective(dir);
307:            }
308:
309:            // Send directive back to the Source of Response from this Target
310:            // called from sendVerification, addTarget, localResponse
311:            private void sendResponse(Relay.Target rt, MessageAddress source,
312:                    Object resp, Collection c) {
313:                RelayDirective.Response dir = new RelayDirective.Response(rt
314:                        .getUID(), resp);
315:                dir.setSource(self);
316:                dir.setDestination(source);
317:                rootplan.sendDirective(dir, c);
318:            }
319:
320:            // Resend latest (possibly null) response from this target to the source
321:            // called from verify
322:            private void sendVerification(Relay.Target rt, MessageAddress source) {
323:                Object resp = rt.getResponse();
324:                // Send even if null response
325:                sendResponse(rt, source, resp, Collections.EMPTY_SET);
326:            }
327:
328:            // MessageLogicProvider implementation
329:            public void execute(Directive dir, Collection changes) {
330:                if (dir instanceof  RelayDirective) {
331:                    // Quick test for one of ours
332:                    if (self.equals(dir.getSource().getPrimary()))
333:                        return;
334:
335:                    if (dir instanceof  RelayDirective.Change) {
336:                        receiveChange((RelayDirective.Change) dir, changes);
337:                        return;
338:                    }
339:                    if (dir instanceof  RelayDirective.Add) {
340:                        receiveAdd((RelayDirective.Add) dir);
341:                        return;
342:                    }
343:                    if (dir instanceof  RelayDirective.Remove) {
344:                        receiveRemove((RelayDirective.Remove) dir);
345:                        return;
346:                    }
347:                    if (dir instanceof  RelayDirective.Response) {
348:                        receiveResponse((RelayDirective.Response) dir, changes);
349:                        return;
350:                    }
351:                }
352:            }
353:
354:            // called from receiveAdd and receiveChange
355:            // In the target agent, add the Relay.Target (which may also implement Relay.Source)
356:            private void addTarget(Relay.TargetFactory tf, Object cont,
357:                    RelayDirective dir) {
358:                Relay.Target rt;
359:                if (tf != null) {
360:                    rt = tf.create(dir.getUID(), dir.getSource(), cont, token);
361:                } else if (cont instanceof  Relay.Target) {
362:                    rt = (Relay.Target) cont;
363:                } else {
364:                    // ERROR cannot create target
365:                    return;
366:                }
367:                if (rt == null)
368:                    return; // Target should not exist here
369:                // Add the target. Note that if it is also a source,
370:                // this LP will wake up again, and try to send the relay
371:                // to all the targets.
372:                /// FIXME: This is a place to block relaying. Add a Marker report?
373:                rootplan.add(rt);
374:                // Check for immediate response due to arrival
375:                Object resp = rt.getResponse();
376:                if (resp != null) {
377:                    sendResponse(rt, dir.getSource(), resp,
378:                            Collections.EMPTY_SET);
379:                }
380:            }
381:
382:            // called from receiveAdd and receiveChange
383:            private void changeTarget(Relay.Target rt, Object cont,
384:                    Collection changes) {
385:                // Branch on the change type flag.
386:                // If the content changed, then mark the taret as changed,
387:                // but in such a way that this LP won't run again
388:                int flags = rt.updateContent(cont, token);
389:                if ((flags & Relay.CONTENT_CHANGE) != 0) {
390:                    Collection c;
391:                    if (changes == null) {
392:                        c = Collections.singleton(MarkerReport.INSTANCE);
393:                    } else {
394:                        c = new ArrayList(changes);
395:                        c.add(MarkerReport.INSTANCE);
396:                    }
397:                    // Note the MarkerReport is on this change,
398:                    // so the LP will not think the Response changed
399:                    // and needs to flow back
400:                    rootplan.change(rt, c);
401:                    // FIXME: What is this for?!!
402:                    // Note I made localChange bail if this is not the Source
403:                    // Presumably this is for chaining. It means that if a content
404:                    // change comes in to this agent, and the local Target is also
405:                    // a source, we can let this LP pretend the change
406:                    // was local, and propogate it to the listed targets
407:                    // FIXME!!
408:                    if (rt instanceof  Relay.Source)
409:                        localChange((Relay.Source) rt, changes);
410:                }
411:
412:                // If we (also) changed the response on the relay,
413:                // send that reponse to the source if possible
414:                // -- but I don't see how or why an incoming directive would say that
415:                if ((flags & Relay.RESPONSE_CHANGE) != 0) {
416:                    // Note localResponse does nothing if this is the source (correctly)
417:                    localResponse(rt, Collections.EMPTY_SET);
418:                }
419:            }
420:
421:            // called from lp.execute when get an incoming add directive
422:            private void receiveAdd(RelayDirective.Add dir) {
423:                UniqueObject uo = rootplan.findUniqueObject(dir.getUID());
424:                if (!(uo instanceof  Relay.Target) && uo != null) {
425:                    logger
426:                            .error(self
427:                                    + ".receiveAdd RelayDirective.Add expected to find a Target on the BBoard, found: "
428:                                    + uo + " for Directive " + dir
429:                                    + ", source " + dir.getSource());
430:                    return;
431:                }
432:                Relay.Target rt = (Relay.Target) uo;
433:                if (rt == null) {
434:                    addTarget(dir.getTargetFactory(), dir.getContent(), dir);
435:                } else {
436:                    // Unusual. Treat as change
437:                    changeTarget(rt, dir.getContent(), Collections.EMPTY_SET);
438:                }
439:            }
440:
441:            // Receive a change from remote Source at this Target
442:            // called only from incoming directive to lp.execute
443:            private void receiveChange(RelayDirective.Change dir,
444:                    Collection changes) {
445:                Relay.Target rt = (Relay.Target) rootplan.findUniqueObject(dir
446:                        .getUID());
447:                if (rt == null) {
448:                    // Unusual. Treat as add.
449:                    addTarget(dir.getTargetFactory(), dir.getContent(), dir);
450:                } else {
451:                    // What if this is the source?
452:                    changeTarget(rt, dir.getContent(), changes);
453:                }
454:            }
455:
456:            // called only from lp.execute when get a directive to remove this relay
457:            private void receiveRemove(RelayDirective.Remove dir) {
458:                Relay.Target rt = (Relay.Target) rootplan.findUniqueObject(dir
459:                        .getUID());
460:                if (rt == null) {
461:                    // Unusual. Ignore.
462:                } else {
463:                    rootplan.remove(rt);
464:                }
465:            }
466:
467:            // called only from lp.execute
468:            private void receiveResponse(RelayDirective.Response dir,
469:                    Collection changes) {
470:                UniqueObject uo = rootplan.findUniqueObject(dir.getUID());
471:                MessageAddress target = dir.getSource();
472:                if (!(uo instanceof  Relay.Source) && uo != null) {
473:                    // This is not legitimate. We'll get a ClassCastException below
474:                    // if we're not careful
475:                    logger
476:                            .error(
477:                                    self
478:                                            + ": receiveResponse got non Relay.Source (Bug 3202?). Got: "
479:                                            + uo + " from the Response["
480:                                            + dir.getUID() + "] with source "
481:                                            + target + " and dest "
482:                                            + dir.getDestination()
483:                                            + ", response " + dir.getResponse(),
484:                                    new Throwable());
485:                    return;
486:                }
487:                Relay.Source rs = (Relay.Source) uo;
488:                //    Relay.Source rs = (Relay.Source) rootplan.findUniqueObject(dir.getUID());
489:                if (rs == null) {
490:                    // No longer part of our blackboard. Rescind it.
491:                    if (logger.isInfoEnabled())
492:                        logger
493:                                .info(self
494:                                        + ": receiveResponse got NULL Relay.Source from the Response["
495:                                        + dir.getUID() + "] with source "
496:                                        + target + " and dest "
497:                                        + dir.getDestination() + ", response "
498:                                        + dir.getResponse());
499:
500:                    sendRemove(dir.getUID(), target);
501:                } else {
502:                    Object resp = dir.getResponse();
503:                    if (resp != null) {
504:                        // Have a response. If the response changed, must locally 
505:                        // publishChange the relay, but don't loop and resend the relay.
506:                        int flags = rs.updateResponse(target, resp);
507:                        if ((flags & Relay.RESPONSE_CHANGE) != 0) {
508:                            Collection c;
509:                            if (changes == null) {
510:                                c = new ArrayList(1);
511:                            } else {
512:                                c = new ArrayList(changes);
513:                            }
514:                            c.add(MarkerReport.INSTANCE);
515:                            // FIXME: Must I require that this Relay.Source in fact originated
516:                            // on the local agent before doing a publishChange?
517:                            // FIXME: Should dir.getDestination().getPrimary().equals(self)?
518:                            // And if this is also a target, should its source be this
519:                            // agent, or not necessarily? If I was trying to chain Relays,
520:                            // then The source field on a relay.target need not be the place
521:                            // that originated the relay.source implementation - which
522:                            // will be local... I'm confused
523:                            rootplan.change(rs, c);
524:
525:                            // Note that localResponse will do nothing
526:                            // if this is the Source for this target. 
527:                            // This says that a downstream target
528:                            // told us they changed their response. If this is downstream
529:                            // of someone else, then send the response further upstream
530:                            if (rs instanceof  Relay.Target)
531:                                localResponse((Relay.Target) rs, changes);
532:                        }
533:
534:                        // If (also) the content of the relay is different (from the source)
535:                        // then this lets us send the changes downstream maybe? But
536:                        // we just got the info from downstream?
537:                        // Or is this to check the targets list?
538:                        if ((flags & Relay.CONTENT_CHANGE) != 0) {
539:                            // localChange requires that this is the Source as well
540:                            localChange(rs, Collections.EMPTY_SET);
541:                        }
542:                    }
543:                }
544:            }
545:
546:            // RestartLogicProvider implementation
547:
548:            /**
549:             * Agent restart handler. Resend all our Relay.Source again and
550:             * send verification directives for all our Relay.Targets.
551:             */
552:            public void restart(final MessageAddress cid) {
553:                if (logger.isInfoEnabled()) {
554:                    logger.info(self + ": Reconcile with "
555:                            + (cid == null ? "all agents" : cid.toString()));
556:                }
557:                UnaryPredicate pred = new UnaryPredicate() {
558:                    public boolean execute(Object o) {
559:                        return o instanceof  Relay;
560:                    }
561:                };
562:
563:                // Loop over all Relays on the Blackboard
564:                Enumeration en = rootplan.searchBlackboard(pred);
565:                while (en.hasMoreElements()) {
566:                    Relay r = (Relay) en.nextElement();
567:                    // Resend all Relay.Sources
568:                    if (r instanceof  Relay.Source) {
569:                        Relay.Source rs = (Relay.Source) r;
570:                        // What if it's also a target?
571:                        resend(rs, cid);
572:                    }
573:
574:                    // And for Targets, send back a verify to the source
575:                    if (r instanceof  Relay.Target) {
576:                        Relay.Target rt = (Relay.Target) r;
577:                        verify(rt, cid);
578:                    }
579:                }
580:                if (logger.isInfoEnabled()) {
581:                    logger.info(self + ": Reconciled");
582:                }
583:            }
584:
585:            // When someone is restarting, resend all Relays
586:            private void resend(Relay.Source rs, MessageAddress t) {
587:                Set targets = rs.getTargets();
588:                if (targets == null)
589:                    return; // Not really a source
590:                if (targets.isEmpty())
591:                    return;
592:
593:                boolean gotContent = false; // Only grab the content once
594:                Object content = null;
595:
596:                // For each target
597:                for (Iterator i = targets.iterator(); i.hasNext();) {
598:                    MessageAddress target = (MessageAddress) i.next();
599:                    if (target == null) {
600:                        // Ignore nulls.
601:                    } else if (target.getPrimary().equals(self)) {
602:                        // Don't send to ourself.  Likely an error.
603:                    } else if (t != null
604:                            && !target.getPrimary().equals(t.getPrimary())) {
605:                        // Only resend to the specified address.
606:                    } else {
607:                        if (!gotContent) {
608:                            gotContent = true;
609:                            content = rs.getContent();
610:                        }
611:                        if (logger.isInfoEnabled()) {
612:                            logger.info(self + ": Resend"
613:                                    + (t == null ? "*" : "") + " to " + target
614:                                    + ": " + rs.getUID());
615:                        }
616:
617:                        // FIXME: Check that we're not sending to an ABA that includes this agent?
618:
619:                        // Caller ensures that Relay.Sources here
620:                        // really originated here
621:                        // Re-send that Relay as though it were new
622:                        sendAdd(rs, target, content);
623:                    }
624:                }
625:            }
626:
627:            // Given address is restarting (or null). If it's the source
628:            // of the given relay or null and the relay didn't start here,
629:            // then send a verification
630:            private void verify(Relay.Target rt, MessageAddress s) {
631:                MessageAddress source = rt.getSource();
632:                if (source == null)
633:                    return;
634:                if (source.getPrimary().equals(self)) {
635:                    // Don't send to ourself.  Likely an error.
636:                    return;
637:                } else {
638:
639:                    // Sends a verification back to the source
640:                    // if the given address is null or the source address,
641:                    // ie if the source restarted or we did
642:                    if (s == null || source.getPrimary().equals(s.getPrimary())) {
643:                        if (logger.isInfoEnabled()) {
644:                            logger.info(self + ": Verify"
645:                                    + (s == null ? "*" : "") + " to " + source
646:                                    + ": " + rt.getUID());
647:                        }
648:                        sendVerification(rt, source);
649:                    }
650:                }
651:            }
652:
653:            // ABAChange implementation
654:            private static final UnaryPredicate relaySourcePred = new UnaryPredicate() {
655:                public boolean execute(Object o) {
656:                    // FIXME: Somehow require it really is a source from here?
657:                    return o instanceof  Relay.Source;
658:                }
659:            };
660:
661:            // Implement ABAChangeLogicProvider.
662:
663:            // Can get called from DomainAdapter.invokeABAChangeLogicProviders and from RootDomain.invokeABAChangeLogicProviders
664:            // Distributor/Blackboard does one call - on the DomainService (ie DomainManager)
665:            // DomainManager loops over the domains (so RootDomain)
666:            // RootDomain is just an alternate implementation to DomainAdapter
667:            // So really this is all happening from the cacheClearer thread in the
668:            // Blackboard, inside some locks in the Distributor
669:
670:            // Basically, this means that some ABA memberships (may have?) changed
671:            // So we need to go through all Relay sources, look at the
672:            // target lists, and if one is an ABA, get the translation,
673:            // figure out what additions or removals there are. Send
674:            // those adds/removes as necessary.
675:
676:            // Note that when the Relay is initially published,
677:            // no effort is made to translate the ABA
678:
679:            // If we didn't change that the source really started here,
680:            // then we'd get some relays that didn't start here and all targets
681:            // of the relay would each try to send a remove/add to the
682:            // changed members - duplicative at least. The only other
683:            // way around this is if the targets had an empty targets list (via
684:            // making it transient or a clever target factory).
685:            public void abaChange(Set communities) {
686:                if (logger.isDebugEnabled())
687:                    logger.debug(self + ": abaChange");
688:                Enumeration en = rootplan.searchBlackboard(relaySourcePred);
689:                while (en.hasMoreElements()) {
690:                    Relay.Source rs = (Relay.Source) en.nextElement();
691:                    Set targets = rs.getTargets();
692:                    if (targets != null && !targets.isEmpty()) {
693:                        Set oldTranslation = Collections.EMPTY_SET;
694:                        Set newTranslation = Collections.EMPTY_SET;
695:                        for (Iterator i = targets.iterator(); i.hasNext();) {
696:                            Object o = i.next();
697:                            if (o instanceof  AttributeBasedAddress) {
698:                                AttributeBasedAddress aba = (AttributeBasedAddress) o;
699:                                if (communities
700:                                        .contains(aba.getCommunityName())) {
701:                                    ABATranslation abaTranslation = rootplan
702:                                            .getABATranslation(aba);
703:                                    if (abaTranslation != null) {
704:                                        Collection oldC = abaTranslation
705:                                                .getOldTranslation();
706:                                        if (oldC != null && !oldC.isEmpty()) {
707:                                            if (oldTranslation.isEmpty()) {
708:                                                oldTranslation = new HashSet();
709:                                            }
710:                                            oldTranslation.addAll(oldC);
711:                                        }
712:                                        Collection newC = abaTranslation
713:                                                .getCurrentTranslation();
714:                                        if (newC != null && !newC.isEmpty()) {
715:                                            if (newTranslation.isEmpty()) {
716:                                                newTranslation = new HashSet();
717:                                            }
718:                                            newTranslation.addAll(newC);
719:                                        }
720:                                    }
721:                                }
722:                            }
723:                        }
724:                        if (!newTranslation.equals(oldTranslation)) {
725:                            Set adds = new HashSet(newTranslation);
726:                            Set removes = new HashSet(oldTranslation);
727:                            adds.removeAll(oldTranslation);
728:                            removes.removeAll(newTranslation);
729:                            boolean isNOP = adds.isEmpty() && removes.isEmpty();
730:                            if (isNOP && logger.isDebugEnabled()) {
731:                                logger.debug("old " + oldTranslation);
732:                                logger.debug("new " + newTranslation);
733:                                logger.debug("Rmv " + removes + " from " + rs);
734:                                logger.debug("Add " + adds + " to " + rs);
735:                            }
736:                            if (!isNOP && logger.isInfoEnabled()) {
737:                                logger.info("old " + oldTranslation);
738:                                logger.info("new " + newTranslation);
739:                                logger.info("Rmv " + removes + " from " + rs);
740:                                logger.info("Add " + adds + " to " + rs);
741:                            }
742:                            if (!removes.isEmpty()) {
743:                                localRemove(rs.getUID(), removes);
744:                            }
745:                            if (!adds.isEmpty()) {
746:                                localAdd(rs, adds);
747:                            }
748:                        }
749:                    }
750:                }
751:            }
752:
753:            /** 
754:             * ChangeReport for this LP to identify its own changes.
755:             */
756:            private static final class MarkerReport implements  ChangeReport {
757:                public static final MarkerReport INSTANCE = new MarkerReport();
758:
759:                private MarkerReport() {
760:                }
761:
762:                private Object readResolve() {
763:                    return INSTANCE;
764:                }
765:
766:                public String toString() {
767:                    return "relay-marker-report";
768:                }
769:
770:                static final long serialVersionUID = 9091843781928322223L;
771:            }
772:
773:            /** 
774:             * Token implementation, private to RelayLP.
775:             * <p>
776:             * Keeps a map of (agent-&gt;token), which allows rehydrated
777:             * relay objects to use "==" token matching.
778:             */
779:            private static final class TokenImpl extends Relay.Token {
780:                private static final Map tokens = new HashMap(13);
781:                private final MessageAddress addr;
782:
783:                public static TokenImpl getToken(MessageAddress addr) {
784:                    synchronized (tokens) {
785:                        TokenImpl t = (TokenImpl) tokens.get(addr);
786:                        if (t == null) {
787:                            t = new TokenImpl(addr);
788:                            tokens.put(addr, t);
789:                        }
790:                        return t;
791:                    }
792:                }
793:
794:                private TokenImpl(MessageAddress addr) {
795:                    this .addr = addr;
796:                }
797:
798:                private Object readResolve() {
799:                    return getToken(addr);
800:                }
801:
802:                public String toString() {
803:                    return "<token " + addr + ">";
804:                }
805:
806:                static final long serialVersionUID = 3878912876728718092L;
807:            }
808:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.