Source Code Cross Referenced for ServerTransport.java in  » Science » Cougaar12_4 » org » cougaar » core » wp » server » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.wp.server 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 2002-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:
027:        package org.cougaar.core.wp.server;
028:
029:        import java.net.URI;
030:        import java.util.ArrayList;
031:        import java.util.Collection;
032:        import java.util.Collections;
033:        import java.util.HashSet;
034:        import java.util.Iterator;
035:        import java.util.List;
036:        import java.util.Map;
037:        import java.util.Set;
038:        import org.cougaar.bootstrap.SystemProperties;
039:        import org.cougaar.core.agent.service.MessageSwitchService;
040:        import org.cougaar.core.component.Component;
041:        import org.cougaar.core.component.Service;
042:        import org.cougaar.core.component.ServiceAvailableEvent;
043:        import org.cougaar.core.component.ServiceAvailableListener;
044:        import org.cougaar.core.component.ServiceBroker;
045:        import org.cougaar.core.component.ServiceListener;
046:        import org.cougaar.core.component.ServiceProvider;
047:        import org.cougaar.core.component.ServiceRevokedListener;
048:        import org.cougaar.core.mts.Message;
049:        import org.cougaar.core.mts.MessageAddress;
050:        import org.cougaar.core.mts.MessageHandler;
051:        import org.cougaar.core.service.AgentIdentificationService;
052:        import org.cougaar.core.service.LoggingService;
053:        import org.cougaar.core.service.ThreadService;
054:        import org.cougaar.core.service.wp.AddressEntry;
055:        import org.cougaar.core.service.wp.Callback;
056:        import org.cougaar.core.service.wp.Response;
057:        import org.cougaar.core.service.wp.WhitePagesService;
058:        import org.cougaar.core.thread.Schedulable;
059:        import org.cougaar.core.wp.MessageTimeoutUtils;
060:        import org.cougaar.core.wp.Parameters;
061:        import org.cougaar.core.wp.WhitePagesMessage;
062:        import org.cougaar.core.wp.bootstrap.PeersService;
063:        import org.cougaar.core.wp.resolver.ServiceProviderBase;
064:        import org.cougaar.core.wp.resolver.WPAnswer;
065:        import org.cougaar.core.wp.resolver.WPQuery;
066:        import org.cougaar.util.GenericStateModelAdapter;
067:        import org.cougaar.util.RarelyModifiedList;
068:
069:        /**
070:         * This component sends and receives messages for the {@link
071:         * RootAuthority}.
072:         * <p>
073:         * This component is responsible for the server-side hierarchy
074:         * traversal and replication.
075:         */
076:        public class ServerTransport extends GenericStateModelAdapter implements 
077:                Component {
078:
079:            /**
080:             * Should timestamps be relative to the server's clock or
081:             * the client's measured round-trip-time?
082:             *
083:             * @see WPAnswer
084:             */
085:            private final boolean USE_SERVER_TIME = SystemProperties
086:                    .getBoolean("org.cougaar.core.wp.server.useServerTime");
087:
088:            // pick an action that doesn't conflict with WPQuery
089:            private static final int FORWARD_ANSWER = 4;
090:
091:            private ServerTransportConfig config;
092:
093:            private ServiceBroker sb;
094:            private LoggingService logger;
095:            private MessageAddress agentId;
096:            private ThreadService threadService;
097:            private WhitePagesService wps;
098:
099:            private PeersService peersService;
100:            private final PeersService.Client peersClient = new PeersService.Client() {
101:                public void add(MessageAddress addr) {
102:                    ServerTransport.this .addPeer(addr);
103:                }
104:
105:                public void addAll(Set s) {
106:                    for (Iterator iter = s.iterator(); iter.hasNext();) {
107:                        add((MessageAddress) iter.next());
108:                    }
109:                }
110:
111:                public void remove(MessageAddress addr) {
112:                    ServerTransport.this .removePeer(addr);
113:                }
114:
115:                public void removeAll(Set s) {
116:                    for (Iterator iter = s.iterator(); iter.hasNext();) {
117:                        remove((MessageAddress) iter.next());
118:                    }
119:                }
120:            };
121:
122:            private PingAckSP pingAckSP;
123:            private LookupAckSP lookupAckSP;
124:            private ModifyAckSP modifyAckSP;
125:            private ForwardAckSP forwardAckSP;
126:            private ForwardSP forwardSP;
127:
128:            private RarelyModifiedList pingAckClients = new RarelyModifiedList();
129:            private RarelyModifiedList lookupAckClients = new RarelyModifiedList();
130:            private RarelyModifiedList modifyAckClients = new RarelyModifiedList();
131:            private RarelyModifiedList forwardAckClients = new RarelyModifiedList();
132:            private RarelyModifiedList forwardClients = new RarelyModifiedList();
133:
134:            //
135:            // peer servers
136:            //
137:
138:            private final Object peersLock = new Object();
139:            private Set peers = Collections.EMPTY_SET;
140:
141:            //
142:            // output (send to WP server):
143:            //
144:
145:            private final Object sendLock = new Object();
146:
147:            private MessageSwitchService messageSwitchService;
148:
149:            // this is our startup grace-time on message timeouts, which is
150:            // based upon the time we obtained our messageSwitchService plus
151:            // the configuration's "graceMillis".
152:            //
153:            // this is used to allow more delivery time when the system is
154:            // starting, since unusual costs usually occur (e.g. cryto
155:            // handshaking).
156:            private long graceTime;
157:
158:            // messages queued until the messageSwitchService is available
159:            //
160:            // List<WhitePagesMessage> 
161:            private List sendQueue;
162:
163:            //
164:            // input (receive from WP server):
165:            //
166:
167:            private Schedulable receiveThread;
168:
169:            // received messages
170:            //
171:            // List<WhitePagesMessage>
172:            private final List receiveQueue = new ArrayList();
173:
174:            // temporary list for use within "receiveNow()"
175:            //
176:            // List<Object>
177:            private final List receiveTmp = new ArrayList();
178:
179:            //
180:            // debug queues:
181:            //
182:
183:            private Schedulable debugThread;
184:
185:            public void setParameter(Object o) {
186:                configure(o);
187:            }
188:
189:            private void configure(Object o) {
190:                if (config != null) {
191:                    return;
192:                }
193:                config = new ServerTransportConfig(o);
194:            }
195:
196:            public void setServiceBroker(ServiceBroker sb) {
197:                this .sb = sb;
198:            }
199:
200:            public void setLoggingService(LoggingService logger) {
201:                this .logger = logger;
202:            }
203:
204:            public void setThreadService(ThreadService threadService) {
205:                this .threadService = threadService;
206:            }
207:
208:            public void setWhitePagesService(WhitePagesService wps) {
209:                this .wps = wps;
210:            }
211:
212:            public void load() {
213:                super .load();
214:
215:                configure(null);
216:
217:                if (logger.isDebugEnabled()) {
218:                    logger.debug("Loading server remote handler");
219:                }
220:
221:                // which agent are we in?
222:                AgentIdentificationService ais = (AgentIdentificationService) sb
223:                        .getService(this , AgentIdentificationService.class,
224:                                null);
225:                agentId = ais.getMessageAddress();
226:                sb.releaseService(this , AgentIdentificationService.class, ais);
227:
228:                // watch for peer servers
229:                peersService = (PeersService) sb.getService(peersClient,
230:                        PeersService.class, null);
231:                if (peersService == null) {
232:                    throw new RuntimeException("Unable to obtain PeersService");
233:                }
234:
235:                // create threads
236:                Runnable receiveRunner = new Runnable() {
237:                    public void run() {
238:                        // assert (thread == receiveThread);
239:                        receiveNow();
240:                    }
241:                };
242:                receiveThread = threadService.getThread(this , receiveRunner,
243:                        "White pages server handle incoming responses");
244:
245:                if (0 < config.debugQueuesPeriod && logger.isDebugEnabled()) {
246:                    Runnable debugRunner = new Runnable() {
247:                        public void run() {
248:                            // assert (thread == debugThread);
249:                            debugQueues();
250:                        }
251:                    };
252:                    debugThread = threadService.getThread(this , debugRunner,
253:                            "White pages server handle outgoing requests");
254:                    debugThread.start();
255:                }
256:
257:                // tell the WP that we're a server
258:                bindServerFlag(true);
259:
260:                // register our message switch (now or later)
261:                if (sb.hasService(MessageSwitchService.class)) {
262:                    registerMessageSwitch();
263:                } else {
264:                    ServiceAvailableListener sal = new ServiceAvailableListener() {
265:                        public void serviceAvailable(ServiceAvailableEvent ae) {
266:                            Class cl = ae.getService();
267:                            if (MessageSwitchService.class.isAssignableFrom(cl)) {
268:                                registerMessageSwitch();
269:                            }
270:                        }
271:                    };
272:                    sb.addServiceListener(sal);
273:                }
274:
275:                // advertise our services
276:                pingAckSP = new PingAckSP();
277:                sb.addService(PingAckService.class, pingAckSP);
278:                lookupAckSP = new LookupAckSP();
279:                sb.addService(LookupAckService.class, lookupAckSP);
280:                modifyAckSP = new ModifyAckSP();
281:                sb.addService(ModifyAckService.class, modifyAckSP);
282:                forwardAckSP = new ForwardAckSP();
283:                sb.addService(ForwardAckService.class, forwardAckSP);
284:                forwardSP = new ForwardSP();
285:                sb.addService(ForwardService.class, forwardSP);
286:            }
287:
288:            public void unload() {
289:                if (forwardSP != null) {
290:                    sb.revokeService(ForwardService.class, forwardSP);
291:                    forwardSP = null;
292:                }
293:                if (forwardAckSP != null) {
294:                    sb.revokeService(ForwardAckService.class, forwardAckSP);
295:                    forwardAckSP = null;
296:                }
297:                if (modifyAckSP != null) {
298:                    sb.revokeService(ModifyAckService.class, modifyAckSP);
299:                    modifyAckSP = null;
300:                }
301:                if (lookupAckSP != null) {
302:                    sb.revokeService(LookupAckService.class, lookupAckSP);
303:                    lookupAckSP = null;
304:                }
305:                if (pingAckSP != null) {
306:                    sb.revokeService(PingAckService.class, pingAckSP);
307:                    pingAckSP = null;
308:                }
309:
310:                if (messageSwitchService != null) {
311:                    //messageSwitchService.removeMessageHandler(myMessageHandler);
312:                    sb.releaseService(this , MessageSwitchService.class,
313:                            messageSwitchService);
314:                    messageSwitchService = null;
315:                }
316:
317:                bindServerFlag(false);
318:
319:                if (peersService != null) {
320:                    sb.releaseService(peersClient, PeersService.class,
321:                            peersService);
322:                    peersService = null;
323:                }
324:
325:                if (wps != null) {
326:                    sb.releaseService(this , WhitePagesService.class, wps);
327:                    wps = null;
328:                }
329:                if (threadService != null) {
330:                    // halt our threads?
331:                    sb.releaseService(this , ThreadService.class, threadService);
332:                    threadService = null;
333:                }
334:                if (logger != null) {
335:                    sb.releaseService(this , LoggingService.class, logger);
336:                    logger = null;
337:                }
338:
339:                super .unload();
340:            }
341:
342:            private void bindServerFlag(boolean bind) {
343:                AddressEntry entry = AddressEntry.getAddressEntry(agentId
344:                        .getAddress(), "server", URI.create("server:///true"));
345:                // should really pay attention
346:                final LoggingService ls = logger;
347:                Callback callback = new Callback() {
348:                    public void execute(Response res) {
349:                        if (res.isSuccess()) {
350:                            if (ls.isInfoEnabled()) {
351:                                ls.info("WP Response: " + res);
352:                            }
353:                        } else {
354:                            ls.error("WP Error: " + res);
355:                        }
356:                    }
357:                };
358:                if (bind) {
359:                    wps.rebind(entry, callback);
360:                } else {
361:                    wps.unbind(entry, callback);
362:                }
363:            }
364:
365:            private void addPeer(MessageAddress addr) {
366:                updatePeer(true, addr);
367:            }
368:
369:            private void removePeer(MessageAddress addr) {
370:                updatePeer(false, addr);
371:            }
372:
373:            private void updatePeer(boolean add, MessageAddress addr) {
374:                if (addr == null) {
375:                    return;
376:                }
377:                synchronized (peersLock) {
378:                    MessageAddress a = addr.getPrimary();
379:                    if (add == peers.contains(a)) {
380:                        if (logger.isInfoEnabled()) {
381:                            logger.info("Ignoring " + (add ? "add" : "remove")
382:                                    + " of peer " + a + " that is "
383:                                    + (add ? "already" : "not")
384:                                    + " in our peers[" + peers.size() + "]="
385:                                    + peers);
386:                        }
387:                        return;
388:                    }
389:                    // copy-on-write
390:                    Set np = new HashSet(peers);
391:                    if (add) {
392:                        np.add(a);
393:                    } else {
394:                        np.remove(a);
395:                    }
396:                    peers = Collections.unmodifiableSet(np);
397:                    if (logger.isInfoEnabled()) {
398:                        logger.info((add ? "Added" : "Removed")
399:                                + " peer server " + a + " "
400:                                + (add ? "to" : "from") + " peers["
401:                                + peers.size() + "]=" + peers);
402:                    }
403:                }
404:                // TODO on "add" we should forward old messages within the
405:                // expire ttd, but this would require help from the server
406:                // tables.  For now we'll ignore this case and let the next
407:                // "forward" take care of new peers.
408:            }
409:
410:            private Set getPeers() {
411:                synchronized (peersLock) {
412:                    return peers;
413:                }
414:            }
415:
416:            private boolean isPeer(MessageAddress addr) {
417:                if (addr == null) {
418:                    return false;
419:                }
420:                MessageAddress a = addr.getPrimary();
421:                if (agentId.equals(a)) {
422:                    return false;
423:                }
424:                return getPeers().contains(a);
425:            }
426:
427:            private void registerMessageSwitch() {
428:                // service broker now has the MessageSwitchService
429:                //
430:                // should we do this in a separate thread?
431:                if (messageSwitchService != null) {
432:                    if (logger.isErrorEnabled()) {
433:                        logger.error("Already obtained our message switch");
434:                    }
435:                    return;
436:                }
437:                MessageSwitchService mss = (MessageSwitchService) sb
438:                        .getService(this , MessageSwitchService.class, null);
439:                if (mss == null) {
440:                    if (logger.isErrorEnabled()) {
441:                        logger.error("Unable to obtain MessageSwitchService");
442:                    }
443:                    return;
444:                }
445:                MessageHandler myMessageHandler = new MessageHandler() {
446:                    public boolean handleMessage(Message m) {
447:                        return receive(m);
448:                    }
449:                };
450:                mss.addMessageHandler(myMessageHandler);
451:                if (logger.isInfoEnabled()) {
452:                    logger.info("Registered server message handler");
453:                }
454:                synchronized (sendLock) {
455:                    this .messageSwitchService = mss;
456:                    if (0 <= config.graceMillis) {
457:                        this .graceTime = System.currentTimeMillis()
458:                                + config.graceMillis;
459:                    }
460:                    if (sendQueue != null) {
461:                        // send queued messages
462:                        //
463:                        Runnable flushSendQueueRunner = new Runnable() {
464:                            public void run() {
465:                                synchronized (sendLock) {
466:                                    flushSendQueue();
467:                                }
468:                            }
469:                        };
470:                        Schedulable flushSendQueueThread = threadService
471:                                .getThread(this , flushSendQueueRunner,
472:                                        "White pages server flush queued output messages");
473:                        flushSendQueueThread.start();
474:                        // this may race with the normal message-send code,
475:                        // so we also check the sendQueue there.  This means
476:                        // that the above "flushSendQueue()" call may find a
477:                        // null sendQueue by the time it is run.
478:                    }
479:                }
480:            }
481:
482:            private List getList(int action) {
483:                return (action == WPAnswer.LOOKUP ? lookupAckClients
484:                        : action == WPAnswer.MODIFY ? modifyAckClients
485:                                : action == WPAnswer.FORWARD ? forwardAckClients
486:                                        : action == WPAnswer.PING ? pingAckClients
487:                                                : action == FORWARD_ANSWER ? forwardClients
488:                                                        : null);
489:            }
490:
491:            private void register(int action, Object c) {
492:                getList(action).add(c);
493:            }
494:
495:            private void unregister(int action, Object c) {
496:                getList(action).remove(c);
497:            }
498:
499:            private void tellClients(int action, MessageAddress clientAddr,
500:                    long clientTime, Map m) {
501:                // tell our clients (refactor me?)
502:                int n = (m == null ? 0 : m.size());
503:                if (n == 0 && action != WPAnswer.PING) {
504:                    return;
505:                }
506:                if (action == WPAnswer.LOOKUP) {
507:                    List l = lookupAckClients.getUnmodifiableList();
508:                    for (int i = 0, ln = l.size(); i < ln; i++) {
509:                        LookupAckService.Client c = (LookupAckService.Client) l
510:                                .get(i);
511:                        c.lookup(clientAddr, clientTime, m);
512:                    }
513:                } else if (action == WPAnswer.MODIFY) {
514:                    List l = modifyAckClients.getUnmodifiableList();
515:                    for (int i = 0, ln = l.size(); i < ln; i++) {
516:                        ModifyAckService.Client c = (ModifyAckService.Client) l
517:                                .get(i);
518:                        c.modify(clientAddr, clientTime, m);
519:                    }
520:                } else if (action == WPAnswer.FORWARD) {
521:                    List l = forwardAckClients.getUnmodifiableList();
522:                    for (int i = 0, ln = l.size(); i < ln; i++) {
523:                        ForwardAckService.Client c = (ForwardAckService.Client) l
524:                                .get(i);
525:                        c.forward(clientAddr, clientTime, m);
526:                    }
527:                } else if (action == WPAnswer.PING) {
528:                    List l = pingAckClients.getUnmodifiableList();
529:                    for (int i = 0, ln = l.size(); i < ln; i++) {
530:                        PingAckService.Client c = (PingAckService.Client) l
531:                                .get(i);
532:                        c.ping(clientAddr, clientTime, m);
533:                    }
534:                } else if (action == FORWARD_ANSWER) {
535:                    List l = forwardClients.getUnmodifiableList();
536:                    for (int i = 0, ln = l.size(); i < ln; i++) {
537:                        ForwardService.Client c = (ForwardService.Client) l
538:                                .get(i);
539:                        c.forwardAnswer(clientAddr, clientTime, m);
540:                    }
541:                } else if (logger.isErrorEnabled()) {
542:                    logger.error("Unknown action " + action);
543:                }
544:            }
545:
546:            private void send(int action, MessageAddress clientAddr,
547:                    long clientTime, Map m) {
548:                if ((m == null || m.isEmpty()) && (action != WPAnswer.PING)) {
549:                    return;
550:                }
551:                if (action == WPAnswer.FORWARD && !isPeer(clientAddr)) {
552:                    // ignore, either the local server or a non-peer
553:                    return;
554:                }
555:
556:                long now = System.currentTimeMillis();
557:
558:                long timeout = (action == WPAnswer.LOOKUP ? config.lookupTimeoutMillis
559:                        : action == WPAnswer.PING ? config.pingTimeoutMillis
560:                                : config.modifyTimeoutMillis);
561:                if (0 < timeout && 0 < graceTime) {
562:                    long diff = graceTime - now;
563:                    if (0 < diff && timeout < diff) {
564:                        timeout = diff;
565:                    }
566:                }
567:                long deadline = now + timeout;
568:
569:                // tag with optional timeout attribute
570:                MessageAddress target = MessageTimeoutUtils.setDeadline(
571:                        clientAddr, deadline);
572:
573:                WPAnswer wpa = new WPAnswer(agentId, target, clientTime, now,
574:                        USE_SERVER_TIME, action, m);
575:
576:                sendOrQueue(wpa);
577:            }
578:
579:            private void forward(Map m, long ttd) {
580:                Set targets = getPeers();
581:                int n = targets.size();
582:                if (logger.isDetailEnabled()) {
583:                    logger.detail("forwarding " + m + " to all peers[" + n
584:                            + "]=" + targets + " except ourselves(" + agentId
585:                            + ")");
586:                }
587:                long now = System.currentTimeMillis();
588:                long ttl = now + ttd;
589:                Iterator iter = targets.iterator();
590:                for (int i = 0; i < n; i++) {
591:                    MessageAddress target = (MessageAddress) iter.next();
592:                    if (agentId.equals(target.getPrimary())) {
593:                        // exclude the local server
594:                        continue;
595:                    }
596:                    // send to this target
597:                    target = MessageTimeoutUtils.setDeadline(target, ttl);
598:                    WPQuery wpq = new WPQuery(agentId, target, now,
599:                            WPQuery.FORWARD, m);
600:                    sendOrQueue(wpq);
601:                }
602:            }
603:
604:            private void forward(MessageAddress addr, Map m, long ttd) {
605:                if (!isPeer(addr)) {
606:                    // ignore, either the local server or a non-peer
607:                    return;
608:                }
609:                // send to this target
610:                long now = System.currentTimeMillis();
611:                long deadline = now + ttd;
612:                MessageAddress target = MessageTimeoutUtils.setDeadline(addr,
613:                        deadline);
614:                WPQuery wpq = new WPQuery(agentId, target, now,
615:                        WPQuery.FORWARD, m);
616:                sendOrQueue(wpq);
617:            }
618:
619:            private void sendOrQueue(WhitePagesMessage m) {
620:                synchronized (sendLock) {
621:                    if (messageSwitchService == null) {
622:                        // queue to send once the MTS is up
623:                        if (sendQueue == null) {
624:                            sendQueue = new ArrayList();
625:                        }
626:                        sendQueue.add(m);
627:                        return;
628:                    } else if (sendQueue != null) {
629:                        // flush pending messages
630:                        flushSendQueue();
631:                    } else {
632:                        // typical case
633:                    }
634:                    send(m);
635:                }
636:            }
637:
638:            private void send(WhitePagesMessage m) {
639:                // assert (Thread.holdsLock(sendLock));
640:                // assert (messageSwitchService != null);
641:                if (logger.isDetailEnabled()) {
642:                    logger.detail("sending message: " + m);
643:                }
644:                messageSwitchService.sendMessage(m);
645:            }
646:
647:            private void flushSendQueue() {
648:                // assert (Thread.holdsLock(sendLock));
649:                // assert (messageSwitchService != null);
650:                List l = sendQueue;
651:                sendQueue = null;
652:                int n = (l == null ? 0 : l.size());
653:                if (n != 0) {
654:                    // must drain in reverse order, since we appended
655:                    // to the end.
656:                    for (int i = n - 1; 0 <= i; i--) {
657:                        WhitePagesMessage m = (WhitePagesMessage) l.get(i);
658:                        send(m);
659:                    }
660:                }
661:            }
662:
663:            private void receiveNow(WhitePagesMessage wpm) {
664:                if (logger.isDetailEnabled()) {
665:                    logger.detail("receiving message: " + wpm);
666:                }
667:
668:                MessageAddress clientAddr = wpm.getOriginator();
669:
670:                Map m;
671:                long clientTime;
672:                int action;
673:                if (wpm instanceof  WPQuery) {
674:                    WPQuery wpq = (WPQuery) wpm;
675:                    m = wpq.getMap();
676:                    clientTime = wpq.getSendTime();
677:                    action = wpq.getAction();
678:                } else {
679:                    WPAnswer wpa = (WPAnswer) wpm;
680:                    m = wpa.getMap();
681:                    clientTime = wpa.getReplyTime();
682:                    action = FORWARD_ANSWER;
683:                }
684:
685:                tellClients(action, clientAddr, clientTime, m);
686:            }
687:
688:            //
689:            // message receive queue
690:            //
691:
692:            private boolean receive(Message m) {
693:                if (m instanceof  WPQuery) {
694:                    // match
695:                } else if (m instanceof  WPAnswer) {
696:                    if (((WPAnswer) m).getAction() != WPAnswer.FORWARD) {
697:                        return false;
698:                    }
699:                } else {
700:                    return false;
701:                }
702:                WhitePagesMessage wpm = (WhitePagesMessage) m;
703:                receiveLater(wpm);
704:                return true;
705:            }
706:
707:            private void receiveLater(WhitePagesMessage m) {
708:                // queue to run in our thread
709:                synchronized (receiveQueue) {
710:                    receiveQueue.add(m);
711:                }
712:                receiveThread.start();
713:            }
714:
715:            private void receiveNow() {
716:                synchronized (receiveQueue) {
717:                    if (receiveQueue.isEmpty()) {
718:                        if (logger.isDetailEnabled()) {
719:                            logger.detail("input queue is empty");
720:                        }
721:                        return;
722:                    }
723:                    receiveTmp.addAll(receiveQueue);
724:                    receiveQueue.clear();
725:                }
726:                // receive messages
727:                for (int i = 0, n = receiveTmp.size(); i < n; i++) {
728:                    WhitePagesMessage m = (WhitePagesMessage) receiveTmp.get(i);
729:                    receiveNow(m);
730:                }
731:                receiveTmp.clear();
732:            }
733:
734:            private void debugQueues() {
735:                if (!logger.isDebugEnabled()) {
736:                    return;
737:                }
738:
739:                synchronized (receiveQueue) {
740:                    String s = "";
741:                    s += "\n##### server transport input queue ################";
742:                    int n = receiveQueue.size();
743:                    s += "\nreceive[" + n + "]: ";
744:                    for (int i = 0; i < n; i++) {
745:                        WhitePagesMessage m = (WhitePagesMessage) receiveQueue
746:                                .get(i);
747:                        s += "\n   " + m;
748:                    }
749:                    s += "\n###################################################";
750:                    logger.debug(s);
751:                }
752:
753:                synchronized (sendLock) {
754:                    String s = "";
755:                    s += "\n##### server transport output queue ###############";
756:                    s += "\nmessageSwitchService=" + messageSwitchService;
757:                    int n = (sendQueue == null ? 0 : sendQueue.size());
758:                    s += "\nsendQueue[" + n + "]: " + sendQueue;
759:                    s += "\n###################################################";
760:                    logger.debug(s);
761:                }
762:
763:                // run me again later
764:                debugThread.schedule(config.debugQueuesPeriod);
765:            }
766:
767:            private abstract class SPBase extends ServiceProviderBase {
768:                protected abstract int getAction();
769:
770:                protected void register(Object client) {
771:                    ServerTransport.this .register(getAction(), client);
772:                }
773:
774:                protected void unregister(Object client) {
775:                    ServerTransport.this .unregister(getAction(), client);
776:                }
777:            }
778:
779:            private class PingAckSP extends SPBase {
780:                protected int getAction() {
781:                    return WPAnswer.PING;
782:                }
783:
784:                protected Class getServiceClass() {
785:                    return PingAckService.class;
786:                }
787:
788:                protected Class getClientClass() {
789:                    return PingAckService.Client.class;
790:                }
791:
792:                protected Service getService(Object client) {
793:                    return new SI(client);
794:                }
795:
796:                protected class SI extends MyServiceImpl implements 
797:                        PingAckService {
798:                    public SI(Object client) {
799:                        super (client);
800:                    }
801:
802:                    public void pingAnswer(MessageAddress clientAddr,
803:                            long clientTime, Map m) {
804:                        ServerTransport.this .send(WPAnswer.PING, clientAddr,
805:                                clientTime, m);
806:                    }
807:                }
808:            }
809:
810:            private class LookupAckSP extends SPBase {
811:                protected int getAction() {
812:                    return WPAnswer.LOOKUP;
813:                }
814:
815:                protected Class getServiceClass() {
816:                    return LookupAckService.class;
817:                }
818:
819:                protected Class getClientClass() {
820:                    return LookupAckService.Client.class;
821:                }
822:
823:                protected Service getService(Object client) {
824:                    return new SI(client);
825:                }
826:
827:                protected class SI extends MyServiceImpl implements 
828:                        LookupAckService {
829:                    public SI(Object client) {
830:                        super (client);
831:                    }
832:
833:                    public void lookupAnswer(MessageAddress clientAddr,
834:                            long clientTime, Map m) {
835:                        ServerTransport.this .send(WPAnswer.LOOKUP, clientAddr,
836:                                clientTime, m);
837:                    }
838:                }
839:            }
840:
841:            private class ModifyAckSP extends SPBase {
842:                protected int getAction() {
843:                    return WPAnswer.MODIFY;
844:                }
845:
846:                protected Class getServiceClass() {
847:                    return ModifyAckService.class;
848:                }
849:
850:                protected Class getClientClass() {
851:                    return ModifyAckService.Client.class;
852:                }
853:
854:                protected Service getService(Object client) {
855:                    return new SI(client);
856:                }
857:
858:                protected class SI extends MyServiceImpl implements 
859:                        ModifyAckService {
860:                    public SI(Object client) {
861:                        super (client);
862:                    }
863:
864:                    public void modifyAnswer(MessageAddress clientAddr,
865:                            long clientTime, Map m) {
866:                        ServerTransport.this .send(WPAnswer.MODIFY, clientAddr,
867:                                clientTime, m);
868:                    }
869:                }
870:            }
871:
872:            private class ForwardAckSP extends SPBase {
873:                protected int getAction() {
874:                    return WPAnswer.FORWARD;
875:                }
876:
877:                protected Class getServiceClass() {
878:                    return ForwardAckService.class;
879:                }
880:
881:                protected Class getClientClass() {
882:                    return ForwardAckService.Client.class;
883:                }
884:
885:                protected Service getService(Object client) {
886:                    return new SI(client);
887:                }
888:
889:                protected class SI extends MyServiceImpl implements 
890:                        ForwardAckService {
891:                    public SI(Object client) {
892:                        super (client);
893:                    }
894:
895:                    public void forwardAnswer(MessageAddress clientAddr,
896:                            long clientTime, Map m) {
897:                        ServerTransport.this .send(WPAnswer.FORWARD, clientAddr,
898:                                clientTime, m);
899:                    }
900:                }
901:            }
902:
903:            private class ForwardSP extends SPBase {
904:                protected int getAction() {
905:                    return FORWARD_ANSWER;
906:                }
907:
908:                protected Class getServiceClass() {
909:                    return ForwardService.class;
910:                }
911:
912:                protected Class getClientClass() {
913:                    return ForwardService.Client.class;
914:                }
915:
916:                protected Service getService(Object client) {
917:                    return new SI(client);
918:                }
919:
920:                protected class SI extends MyServiceImpl implements 
921:                        ForwardService {
922:                    public SI(Object client) {
923:                        super (client);
924:                    }
925:
926:                    public void forward(Map m, long ttd) {
927:                        ServerTransport.this .forward(m, ttd);
928:                    }
929:
930:                    public void forward(MessageAddress target, Map m, long ttd) {
931:                        ServerTransport.this .forward(target, m, ttd);
932:                    }
933:                }
934:            }
935:
936:            /** config options */
937:            private static class ServerTransportConfig {
938:                public final long debugQueuesPeriod;
939:                public final long graceMillis;
940:                // these should match the server TTLs
941:                public final long lookupTimeoutMillis;
942:                public final long modifyTimeoutMillis;
943:                public final long pingTimeoutMillis;
944:
945:                public ServerTransportConfig(Object o) {
946:                    Parameters p = new Parameters(o,
947:                            "org.cougaar.core.wp.server.");
948:                    debugQueuesPeriod = p.getLong("debugQueuesPeriod", 30000);
949:                    graceMillis = p.getLong("graceMillis", 0);
950:                    lookupTimeoutMillis = p.getLong("lookupTimeoutMillis",
951:                            90000);
952:                    modifyTimeoutMillis = p.getLong("modifyTimeoutMillis",
953:                            90000);
954:                    pingTimeoutMillis = p.getLong("pingTimeoutMillis", 90000);
955:                }
956:            }
957:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.