001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.wp.server;
028:
029: import java.net.URI;
030: import java.util.ArrayList;
031: import java.util.Collection;
032: import java.util.Collections;
033: import java.util.HashSet;
034: import java.util.Iterator;
035: import java.util.List;
036: import java.util.Map;
037: import java.util.Set;
038: import org.cougaar.bootstrap.SystemProperties;
039: import org.cougaar.core.agent.service.MessageSwitchService;
040: import org.cougaar.core.component.Component;
041: import org.cougaar.core.component.Service;
042: import org.cougaar.core.component.ServiceAvailableEvent;
043: import org.cougaar.core.component.ServiceAvailableListener;
044: import org.cougaar.core.component.ServiceBroker;
045: import org.cougaar.core.component.ServiceListener;
046: import org.cougaar.core.component.ServiceProvider;
047: import org.cougaar.core.component.ServiceRevokedListener;
048: import org.cougaar.core.mts.Message;
049: import org.cougaar.core.mts.MessageAddress;
050: import org.cougaar.core.mts.MessageHandler;
051: import org.cougaar.core.service.AgentIdentificationService;
052: import org.cougaar.core.service.LoggingService;
053: import org.cougaar.core.service.ThreadService;
054: import org.cougaar.core.service.wp.AddressEntry;
055: import org.cougaar.core.service.wp.Callback;
056: import org.cougaar.core.service.wp.Response;
057: import org.cougaar.core.service.wp.WhitePagesService;
058: import org.cougaar.core.thread.Schedulable;
059: import org.cougaar.core.wp.MessageTimeoutUtils;
060: import org.cougaar.core.wp.Parameters;
061: import org.cougaar.core.wp.WhitePagesMessage;
062: import org.cougaar.core.wp.bootstrap.PeersService;
063: import org.cougaar.core.wp.resolver.ServiceProviderBase;
064: import org.cougaar.core.wp.resolver.WPAnswer;
065: import org.cougaar.core.wp.resolver.WPQuery;
066: import org.cougaar.util.GenericStateModelAdapter;
067: import org.cougaar.util.RarelyModifiedList;
068:
069: /**
070: * This component sends and receives messages for the {@link
071: * RootAuthority}.
072: * <p>
073: * This component is responsible for the server-side hierarchy
074: * traversal and replication.
075: */
076: public class ServerTransport extends GenericStateModelAdapter implements
077: Component {
078:
079: /**
080: * Should timestamps be relative to the server's clock or
081: * the client's measured round-trip-time?
082: *
083: * @see WPAnswer
084: */
085: private final boolean USE_SERVER_TIME = SystemProperties
086: .getBoolean("org.cougaar.core.wp.server.useServerTime");
087:
088: // pick an action that doesn't conflict with WPQuery
089: private static final int FORWARD_ANSWER = 4;
090:
091: private ServerTransportConfig config;
092:
093: private ServiceBroker sb;
094: private LoggingService logger;
095: private MessageAddress agentId;
096: private ThreadService threadService;
097: private WhitePagesService wps;
098:
099: private PeersService peersService;
100: private final PeersService.Client peersClient = new PeersService.Client() {
101: public void add(MessageAddress addr) {
102: ServerTransport.this .addPeer(addr);
103: }
104:
105: public void addAll(Set s) {
106: for (Iterator iter = s.iterator(); iter.hasNext();) {
107: add((MessageAddress) iter.next());
108: }
109: }
110:
111: public void remove(MessageAddress addr) {
112: ServerTransport.this .removePeer(addr);
113: }
114:
115: public void removeAll(Set s) {
116: for (Iterator iter = s.iterator(); iter.hasNext();) {
117: remove((MessageAddress) iter.next());
118: }
119: }
120: };
121:
122: private PingAckSP pingAckSP;
123: private LookupAckSP lookupAckSP;
124: private ModifyAckSP modifyAckSP;
125: private ForwardAckSP forwardAckSP;
126: private ForwardSP forwardSP;
127:
128: private RarelyModifiedList pingAckClients = new RarelyModifiedList();
129: private RarelyModifiedList lookupAckClients = new RarelyModifiedList();
130: private RarelyModifiedList modifyAckClients = new RarelyModifiedList();
131: private RarelyModifiedList forwardAckClients = new RarelyModifiedList();
132: private RarelyModifiedList forwardClients = new RarelyModifiedList();
133:
134: //
135: // peer servers
136: //
137:
138: private final Object peersLock = new Object();
139: private Set peers = Collections.EMPTY_SET;
140:
141: //
142: // output (send to WP server):
143: //
144:
145: private final Object sendLock = new Object();
146:
147: private MessageSwitchService messageSwitchService;
148:
149: // this is our startup grace-time on message timeouts, which is
150: // based upon the time we obtained our messageSwitchService plus
151: // the configuration's "graceMillis".
152: //
153: // this is used to allow more delivery time when the system is
154: // starting, since unusual costs usually occur (e.g. cryto
155: // handshaking).
156: private long graceTime;
157:
158: // messages queued until the messageSwitchService is available
159: //
160: // List<WhitePagesMessage>
161: private List sendQueue;
162:
163: //
164: // input (receive from WP server):
165: //
166:
167: private Schedulable receiveThread;
168:
169: // received messages
170: //
171: // List<WhitePagesMessage>
172: private final List receiveQueue = new ArrayList();
173:
174: // temporary list for use within "receiveNow()"
175: //
176: // List<Object>
177: private final List receiveTmp = new ArrayList();
178:
179: //
180: // debug queues:
181: //
182:
183: private Schedulable debugThread;
184:
185: public void setParameter(Object o) {
186: configure(o);
187: }
188:
189: private void configure(Object o) {
190: if (config != null) {
191: return;
192: }
193: config = new ServerTransportConfig(o);
194: }
195:
196: public void setServiceBroker(ServiceBroker sb) {
197: this .sb = sb;
198: }
199:
200: public void setLoggingService(LoggingService logger) {
201: this .logger = logger;
202: }
203:
204: public void setThreadService(ThreadService threadService) {
205: this .threadService = threadService;
206: }
207:
208: public void setWhitePagesService(WhitePagesService wps) {
209: this .wps = wps;
210: }
211:
212: public void load() {
213: super .load();
214:
215: configure(null);
216:
217: if (logger.isDebugEnabled()) {
218: logger.debug("Loading server remote handler");
219: }
220:
221: // which agent are we in?
222: AgentIdentificationService ais = (AgentIdentificationService) sb
223: .getService(this , AgentIdentificationService.class,
224: null);
225: agentId = ais.getMessageAddress();
226: sb.releaseService(this , AgentIdentificationService.class, ais);
227:
228: // watch for peer servers
229: peersService = (PeersService) sb.getService(peersClient,
230: PeersService.class, null);
231: if (peersService == null) {
232: throw new RuntimeException("Unable to obtain PeersService");
233: }
234:
235: // create threads
236: Runnable receiveRunner = new Runnable() {
237: public void run() {
238: // assert (thread == receiveThread);
239: receiveNow();
240: }
241: };
242: receiveThread = threadService.getThread(this , receiveRunner,
243: "White pages server handle incoming responses");
244:
245: if (0 < config.debugQueuesPeriod && logger.isDebugEnabled()) {
246: Runnable debugRunner = new Runnable() {
247: public void run() {
248: // assert (thread == debugThread);
249: debugQueues();
250: }
251: };
252: debugThread = threadService.getThread(this , debugRunner,
253: "White pages server handle outgoing requests");
254: debugThread.start();
255: }
256:
257: // tell the WP that we're a server
258: bindServerFlag(true);
259:
260: // register our message switch (now or later)
261: if (sb.hasService(MessageSwitchService.class)) {
262: registerMessageSwitch();
263: } else {
264: ServiceAvailableListener sal = new ServiceAvailableListener() {
265: public void serviceAvailable(ServiceAvailableEvent ae) {
266: Class cl = ae.getService();
267: if (MessageSwitchService.class.isAssignableFrom(cl)) {
268: registerMessageSwitch();
269: }
270: }
271: };
272: sb.addServiceListener(sal);
273: }
274:
275: // advertise our services
276: pingAckSP = new PingAckSP();
277: sb.addService(PingAckService.class, pingAckSP);
278: lookupAckSP = new LookupAckSP();
279: sb.addService(LookupAckService.class, lookupAckSP);
280: modifyAckSP = new ModifyAckSP();
281: sb.addService(ModifyAckService.class, modifyAckSP);
282: forwardAckSP = new ForwardAckSP();
283: sb.addService(ForwardAckService.class, forwardAckSP);
284: forwardSP = new ForwardSP();
285: sb.addService(ForwardService.class, forwardSP);
286: }
287:
288: public void unload() {
289: if (forwardSP != null) {
290: sb.revokeService(ForwardService.class, forwardSP);
291: forwardSP = null;
292: }
293: if (forwardAckSP != null) {
294: sb.revokeService(ForwardAckService.class, forwardAckSP);
295: forwardAckSP = null;
296: }
297: if (modifyAckSP != null) {
298: sb.revokeService(ModifyAckService.class, modifyAckSP);
299: modifyAckSP = null;
300: }
301: if (lookupAckSP != null) {
302: sb.revokeService(LookupAckService.class, lookupAckSP);
303: lookupAckSP = null;
304: }
305: if (pingAckSP != null) {
306: sb.revokeService(PingAckService.class, pingAckSP);
307: pingAckSP = null;
308: }
309:
310: if (messageSwitchService != null) {
311: //messageSwitchService.removeMessageHandler(myMessageHandler);
312: sb.releaseService(this , MessageSwitchService.class,
313: messageSwitchService);
314: messageSwitchService = null;
315: }
316:
317: bindServerFlag(false);
318:
319: if (peersService != null) {
320: sb.releaseService(peersClient, PeersService.class,
321: peersService);
322: peersService = null;
323: }
324:
325: if (wps != null) {
326: sb.releaseService(this , WhitePagesService.class, wps);
327: wps = null;
328: }
329: if (threadService != null) {
330: // halt our threads?
331: sb.releaseService(this , ThreadService.class, threadService);
332: threadService = null;
333: }
334: if (logger != null) {
335: sb.releaseService(this , LoggingService.class, logger);
336: logger = null;
337: }
338:
339: super .unload();
340: }
341:
342: private void bindServerFlag(boolean bind) {
343: AddressEntry entry = AddressEntry.getAddressEntry(agentId
344: .getAddress(), "server", URI.create("server:///true"));
345: // should really pay attention
346: final LoggingService ls = logger;
347: Callback callback = new Callback() {
348: public void execute(Response res) {
349: if (res.isSuccess()) {
350: if (ls.isInfoEnabled()) {
351: ls.info("WP Response: " + res);
352: }
353: } else {
354: ls.error("WP Error: " + res);
355: }
356: }
357: };
358: if (bind) {
359: wps.rebind(entry, callback);
360: } else {
361: wps.unbind(entry, callback);
362: }
363: }
364:
365: private void addPeer(MessageAddress addr) {
366: updatePeer(true, addr);
367: }
368:
369: private void removePeer(MessageAddress addr) {
370: updatePeer(false, addr);
371: }
372:
373: private void updatePeer(boolean add, MessageAddress addr) {
374: if (addr == null) {
375: return;
376: }
377: synchronized (peersLock) {
378: MessageAddress a = addr.getPrimary();
379: if (add == peers.contains(a)) {
380: if (logger.isInfoEnabled()) {
381: logger.info("Ignoring " + (add ? "add" : "remove")
382: + " of peer " + a + " that is "
383: + (add ? "already" : "not")
384: + " in our peers[" + peers.size() + "]="
385: + peers);
386: }
387: return;
388: }
389: // copy-on-write
390: Set np = new HashSet(peers);
391: if (add) {
392: np.add(a);
393: } else {
394: np.remove(a);
395: }
396: peers = Collections.unmodifiableSet(np);
397: if (logger.isInfoEnabled()) {
398: logger.info((add ? "Added" : "Removed")
399: + " peer server " + a + " "
400: + (add ? "to" : "from") + " peers["
401: + peers.size() + "]=" + peers);
402: }
403: }
404: // TODO on "add" we should forward old messages within the
405: // expire ttd, but this would require help from the server
406: // tables. For now we'll ignore this case and let the next
407: // "forward" take care of new peers.
408: }
409:
410: private Set getPeers() {
411: synchronized (peersLock) {
412: return peers;
413: }
414: }
415:
416: private boolean isPeer(MessageAddress addr) {
417: if (addr == null) {
418: return false;
419: }
420: MessageAddress a = addr.getPrimary();
421: if (agentId.equals(a)) {
422: return false;
423: }
424: return getPeers().contains(a);
425: }
426:
427: private void registerMessageSwitch() {
428: // service broker now has the MessageSwitchService
429: //
430: // should we do this in a separate thread?
431: if (messageSwitchService != null) {
432: if (logger.isErrorEnabled()) {
433: logger.error("Already obtained our message switch");
434: }
435: return;
436: }
437: MessageSwitchService mss = (MessageSwitchService) sb
438: .getService(this , MessageSwitchService.class, null);
439: if (mss == null) {
440: if (logger.isErrorEnabled()) {
441: logger.error("Unable to obtain MessageSwitchService");
442: }
443: return;
444: }
445: MessageHandler myMessageHandler = new MessageHandler() {
446: public boolean handleMessage(Message m) {
447: return receive(m);
448: }
449: };
450: mss.addMessageHandler(myMessageHandler);
451: if (logger.isInfoEnabled()) {
452: logger.info("Registered server message handler");
453: }
454: synchronized (sendLock) {
455: this .messageSwitchService = mss;
456: if (0 <= config.graceMillis) {
457: this .graceTime = System.currentTimeMillis()
458: + config.graceMillis;
459: }
460: if (sendQueue != null) {
461: // send queued messages
462: //
463: Runnable flushSendQueueRunner = new Runnable() {
464: public void run() {
465: synchronized (sendLock) {
466: flushSendQueue();
467: }
468: }
469: };
470: Schedulable flushSendQueueThread = threadService
471: .getThread(this , flushSendQueueRunner,
472: "White pages server flush queued output messages");
473: flushSendQueueThread.start();
474: // this may race with the normal message-send code,
475: // so we also check the sendQueue there. This means
476: // that the above "flushSendQueue()" call may find a
477: // null sendQueue by the time it is run.
478: }
479: }
480: }
481:
482: private List getList(int action) {
483: return (action == WPAnswer.LOOKUP ? lookupAckClients
484: : action == WPAnswer.MODIFY ? modifyAckClients
485: : action == WPAnswer.FORWARD ? forwardAckClients
486: : action == WPAnswer.PING ? pingAckClients
487: : action == FORWARD_ANSWER ? forwardClients
488: : null);
489: }
490:
491: private void register(int action, Object c) {
492: getList(action).add(c);
493: }
494:
495: private void unregister(int action, Object c) {
496: getList(action).remove(c);
497: }
498:
499: private void tellClients(int action, MessageAddress clientAddr,
500: long clientTime, Map m) {
501: // tell our clients (refactor me?)
502: int n = (m == null ? 0 : m.size());
503: if (n == 0 && action != WPAnswer.PING) {
504: return;
505: }
506: if (action == WPAnswer.LOOKUP) {
507: List l = lookupAckClients.getUnmodifiableList();
508: for (int i = 0, ln = l.size(); i < ln; i++) {
509: LookupAckService.Client c = (LookupAckService.Client) l
510: .get(i);
511: c.lookup(clientAddr, clientTime, m);
512: }
513: } else if (action == WPAnswer.MODIFY) {
514: List l = modifyAckClients.getUnmodifiableList();
515: for (int i = 0, ln = l.size(); i < ln; i++) {
516: ModifyAckService.Client c = (ModifyAckService.Client) l
517: .get(i);
518: c.modify(clientAddr, clientTime, m);
519: }
520: } else if (action == WPAnswer.FORWARD) {
521: List l = forwardAckClients.getUnmodifiableList();
522: for (int i = 0, ln = l.size(); i < ln; i++) {
523: ForwardAckService.Client c = (ForwardAckService.Client) l
524: .get(i);
525: c.forward(clientAddr, clientTime, m);
526: }
527: } else if (action == WPAnswer.PING) {
528: List l = pingAckClients.getUnmodifiableList();
529: for (int i = 0, ln = l.size(); i < ln; i++) {
530: PingAckService.Client c = (PingAckService.Client) l
531: .get(i);
532: c.ping(clientAddr, clientTime, m);
533: }
534: } else if (action == FORWARD_ANSWER) {
535: List l = forwardClients.getUnmodifiableList();
536: for (int i = 0, ln = l.size(); i < ln; i++) {
537: ForwardService.Client c = (ForwardService.Client) l
538: .get(i);
539: c.forwardAnswer(clientAddr, clientTime, m);
540: }
541: } else if (logger.isErrorEnabled()) {
542: logger.error("Unknown action " + action);
543: }
544: }
545:
546: private void send(int action, MessageAddress clientAddr,
547: long clientTime, Map m) {
548: if ((m == null || m.isEmpty()) && (action != WPAnswer.PING)) {
549: return;
550: }
551: if (action == WPAnswer.FORWARD && !isPeer(clientAddr)) {
552: // ignore, either the local server or a non-peer
553: return;
554: }
555:
556: long now = System.currentTimeMillis();
557:
558: long timeout = (action == WPAnswer.LOOKUP ? config.lookupTimeoutMillis
559: : action == WPAnswer.PING ? config.pingTimeoutMillis
560: : config.modifyTimeoutMillis);
561: if (0 < timeout && 0 < graceTime) {
562: long diff = graceTime - now;
563: if (0 < diff && timeout < diff) {
564: timeout = diff;
565: }
566: }
567: long deadline = now + timeout;
568:
569: // tag with optional timeout attribute
570: MessageAddress target = MessageTimeoutUtils.setDeadline(
571: clientAddr, deadline);
572:
573: WPAnswer wpa = new WPAnswer(agentId, target, clientTime, now,
574: USE_SERVER_TIME, action, m);
575:
576: sendOrQueue(wpa);
577: }
578:
579: private void forward(Map m, long ttd) {
580: Set targets = getPeers();
581: int n = targets.size();
582: if (logger.isDetailEnabled()) {
583: logger.detail("forwarding " + m + " to all peers[" + n
584: + "]=" + targets + " except ourselves(" + agentId
585: + ")");
586: }
587: long now = System.currentTimeMillis();
588: long ttl = now + ttd;
589: Iterator iter = targets.iterator();
590: for (int i = 0; i < n; i++) {
591: MessageAddress target = (MessageAddress) iter.next();
592: if (agentId.equals(target.getPrimary())) {
593: // exclude the local server
594: continue;
595: }
596: // send to this target
597: target = MessageTimeoutUtils.setDeadline(target, ttl);
598: WPQuery wpq = new WPQuery(agentId, target, now,
599: WPQuery.FORWARD, m);
600: sendOrQueue(wpq);
601: }
602: }
603:
604: private void forward(MessageAddress addr, Map m, long ttd) {
605: if (!isPeer(addr)) {
606: // ignore, either the local server or a non-peer
607: return;
608: }
609: // send to this target
610: long now = System.currentTimeMillis();
611: long deadline = now + ttd;
612: MessageAddress target = MessageTimeoutUtils.setDeadline(addr,
613: deadline);
614: WPQuery wpq = new WPQuery(agentId, target, now,
615: WPQuery.FORWARD, m);
616: sendOrQueue(wpq);
617: }
618:
619: private void sendOrQueue(WhitePagesMessage m) {
620: synchronized (sendLock) {
621: if (messageSwitchService == null) {
622: // queue to send once the MTS is up
623: if (sendQueue == null) {
624: sendQueue = new ArrayList();
625: }
626: sendQueue.add(m);
627: return;
628: } else if (sendQueue != null) {
629: // flush pending messages
630: flushSendQueue();
631: } else {
632: // typical case
633: }
634: send(m);
635: }
636: }
637:
638: private void send(WhitePagesMessage m) {
639: // assert (Thread.holdsLock(sendLock));
640: // assert (messageSwitchService != null);
641: if (logger.isDetailEnabled()) {
642: logger.detail("sending message: " + m);
643: }
644: messageSwitchService.sendMessage(m);
645: }
646:
647: private void flushSendQueue() {
648: // assert (Thread.holdsLock(sendLock));
649: // assert (messageSwitchService != null);
650: List l = sendQueue;
651: sendQueue = null;
652: int n = (l == null ? 0 : l.size());
653: if (n != 0) {
654: // must drain in reverse order, since we appended
655: // to the end.
656: for (int i = n - 1; 0 <= i; i--) {
657: WhitePagesMessage m = (WhitePagesMessage) l.get(i);
658: send(m);
659: }
660: }
661: }
662:
663: private void receiveNow(WhitePagesMessage wpm) {
664: if (logger.isDetailEnabled()) {
665: logger.detail("receiving message: " + wpm);
666: }
667:
668: MessageAddress clientAddr = wpm.getOriginator();
669:
670: Map m;
671: long clientTime;
672: int action;
673: if (wpm instanceof WPQuery) {
674: WPQuery wpq = (WPQuery) wpm;
675: m = wpq.getMap();
676: clientTime = wpq.getSendTime();
677: action = wpq.getAction();
678: } else {
679: WPAnswer wpa = (WPAnswer) wpm;
680: m = wpa.getMap();
681: clientTime = wpa.getReplyTime();
682: action = FORWARD_ANSWER;
683: }
684:
685: tellClients(action, clientAddr, clientTime, m);
686: }
687:
688: //
689: // message receive queue
690: //
691:
692: private boolean receive(Message m) {
693: if (m instanceof WPQuery) {
694: // match
695: } else if (m instanceof WPAnswer) {
696: if (((WPAnswer) m).getAction() != WPAnswer.FORWARD) {
697: return false;
698: }
699: } else {
700: return false;
701: }
702: WhitePagesMessage wpm = (WhitePagesMessage) m;
703: receiveLater(wpm);
704: return true;
705: }
706:
707: private void receiveLater(WhitePagesMessage m) {
708: // queue to run in our thread
709: synchronized (receiveQueue) {
710: receiveQueue.add(m);
711: }
712: receiveThread.start();
713: }
714:
715: private void receiveNow() {
716: synchronized (receiveQueue) {
717: if (receiveQueue.isEmpty()) {
718: if (logger.isDetailEnabled()) {
719: logger.detail("input queue is empty");
720: }
721: return;
722: }
723: receiveTmp.addAll(receiveQueue);
724: receiveQueue.clear();
725: }
726: // receive messages
727: for (int i = 0, n = receiveTmp.size(); i < n; i++) {
728: WhitePagesMessage m = (WhitePagesMessage) receiveTmp.get(i);
729: receiveNow(m);
730: }
731: receiveTmp.clear();
732: }
733:
734: private void debugQueues() {
735: if (!logger.isDebugEnabled()) {
736: return;
737: }
738:
739: synchronized (receiveQueue) {
740: String s = "";
741: s += "\n##### server transport input queue ################";
742: int n = receiveQueue.size();
743: s += "\nreceive[" + n + "]: ";
744: for (int i = 0; i < n; i++) {
745: WhitePagesMessage m = (WhitePagesMessage) receiveQueue
746: .get(i);
747: s += "\n " + m;
748: }
749: s += "\n###################################################";
750: logger.debug(s);
751: }
752:
753: synchronized (sendLock) {
754: String s = "";
755: s += "\n##### server transport output queue ###############";
756: s += "\nmessageSwitchService=" + messageSwitchService;
757: int n = (sendQueue == null ? 0 : sendQueue.size());
758: s += "\nsendQueue[" + n + "]: " + sendQueue;
759: s += "\n###################################################";
760: logger.debug(s);
761: }
762:
763: // run me again later
764: debugThread.schedule(config.debugQueuesPeriod);
765: }
766:
767: private abstract class SPBase extends ServiceProviderBase {
768: protected abstract int getAction();
769:
770: protected void register(Object client) {
771: ServerTransport.this .register(getAction(), client);
772: }
773:
774: protected void unregister(Object client) {
775: ServerTransport.this .unregister(getAction(), client);
776: }
777: }
778:
779: private class PingAckSP extends SPBase {
780: protected int getAction() {
781: return WPAnswer.PING;
782: }
783:
784: protected Class getServiceClass() {
785: return PingAckService.class;
786: }
787:
788: protected Class getClientClass() {
789: return PingAckService.Client.class;
790: }
791:
792: protected Service getService(Object client) {
793: return new SI(client);
794: }
795:
796: protected class SI extends MyServiceImpl implements
797: PingAckService {
798: public SI(Object client) {
799: super (client);
800: }
801:
802: public void pingAnswer(MessageAddress clientAddr,
803: long clientTime, Map m) {
804: ServerTransport.this .send(WPAnswer.PING, clientAddr,
805: clientTime, m);
806: }
807: }
808: }
809:
810: private class LookupAckSP extends SPBase {
811: protected int getAction() {
812: return WPAnswer.LOOKUP;
813: }
814:
815: protected Class getServiceClass() {
816: return LookupAckService.class;
817: }
818:
819: protected Class getClientClass() {
820: return LookupAckService.Client.class;
821: }
822:
823: protected Service getService(Object client) {
824: return new SI(client);
825: }
826:
827: protected class SI extends MyServiceImpl implements
828: LookupAckService {
829: public SI(Object client) {
830: super (client);
831: }
832:
833: public void lookupAnswer(MessageAddress clientAddr,
834: long clientTime, Map m) {
835: ServerTransport.this .send(WPAnswer.LOOKUP, clientAddr,
836: clientTime, m);
837: }
838: }
839: }
840:
841: private class ModifyAckSP extends SPBase {
842: protected int getAction() {
843: return WPAnswer.MODIFY;
844: }
845:
846: protected Class getServiceClass() {
847: return ModifyAckService.class;
848: }
849:
850: protected Class getClientClass() {
851: return ModifyAckService.Client.class;
852: }
853:
854: protected Service getService(Object client) {
855: return new SI(client);
856: }
857:
858: protected class SI extends MyServiceImpl implements
859: ModifyAckService {
860: public SI(Object client) {
861: super (client);
862: }
863:
864: public void modifyAnswer(MessageAddress clientAddr,
865: long clientTime, Map m) {
866: ServerTransport.this .send(WPAnswer.MODIFY, clientAddr,
867: clientTime, m);
868: }
869: }
870: }
871:
872: private class ForwardAckSP extends SPBase {
873: protected int getAction() {
874: return WPAnswer.FORWARD;
875: }
876:
877: protected Class getServiceClass() {
878: return ForwardAckService.class;
879: }
880:
881: protected Class getClientClass() {
882: return ForwardAckService.Client.class;
883: }
884:
885: protected Service getService(Object client) {
886: return new SI(client);
887: }
888:
889: protected class SI extends MyServiceImpl implements
890: ForwardAckService {
891: public SI(Object client) {
892: super (client);
893: }
894:
895: public void forwardAnswer(MessageAddress clientAddr,
896: long clientTime, Map m) {
897: ServerTransport.this .send(WPAnswer.FORWARD, clientAddr,
898: clientTime, m);
899: }
900: }
901: }
902:
903: private class ForwardSP extends SPBase {
904: protected int getAction() {
905: return FORWARD_ANSWER;
906: }
907:
908: protected Class getServiceClass() {
909: return ForwardService.class;
910: }
911:
912: protected Class getClientClass() {
913: return ForwardService.Client.class;
914: }
915:
916: protected Service getService(Object client) {
917: return new SI(client);
918: }
919:
920: protected class SI extends MyServiceImpl implements
921: ForwardService {
922: public SI(Object client) {
923: super (client);
924: }
925:
926: public void forward(Map m, long ttd) {
927: ServerTransport.this .forward(m, ttd);
928: }
929:
930: public void forward(MessageAddress target, Map m, long ttd) {
931: ServerTransport.this .forward(target, m, ttd);
932: }
933: }
934: }
935:
936: /** config options */
937: private static class ServerTransportConfig {
938: public final long debugQueuesPeriod;
939: public final long graceMillis;
940: // these should match the server TTLs
941: public final long lookupTimeoutMillis;
942: public final long modifyTimeoutMillis;
943: public final long pingTimeoutMillis;
944:
945: public ServerTransportConfig(Object o) {
946: Parameters p = new Parameters(o,
947: "org.cougaar.core.wp.server.");
948: debugQueuesPeriod = p.getLong("debugQueuesPeriod", 30000);
949: graceMillis = p.getLong("graceMillis", 0);
950: lookupTimeoutMillis = p.getLong("lookupTimeoutMillis",
951: 90000);
952: modifyTimeoutMillis = p.getLong("modifyTimeoutMillis",
953: 90000);
954: pingTimeoutMillis = p.getLong("pingTimeoutMillis", 90000);
955: }
956: }
957: }
|