001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.blackboard;
028:
029: import java.io.FileWriter;
030: import java.io.IOException;
031: import java.io.ObjectInputStream;
032: import java.io.ObjectOutputStream;
033: import java.io.PrintWriter;
034: import java.io.Serializable;
035: import java.text.DateFormat;
036: import java.text.SimpleDateFormat;
037: import java.util.ArrayList;
038: import java.util.Date;
039: import java.util.HashMap;
040: import java.util.Iterator;
041: import java.util.TreeSet;
042:
043: import org.cougaar.bootstrap.SystemProperties;
044: import org.cougaar.core.agent.ClusterMessage;
045: import org.cougaar.core.agent.service.MessageSwitchService;
046: import org.cougaar.core.mts.MessageAddress;
047: import org.cougaar.util.StringUtility;
048:
049: /**
050: * A message acknowledgement manager used by the {@link Distributor}'s
051: * non-lazy persistence mode to ensure that unacknowledged messages
052: * are persisted.
053: */
054: class MessageManagerImpl implements MessageManager, Serializable {
055:
056: public static final long serialVersionUID = -8662117243114391926L;
057:
058: private static final boolean debug = SystemProperties
059: .getBoolean("org.cougaar.core.blackboard.MessageManager.debug");
060:
061: private static final long KEEP_ALIVE_INTERVAL = 55000L;
062:
063: private boolean USE_MESSAGE_MANAGER = false;
064:
065: /** The agent's mts */
066: private transient MessageAddress self;
067: private transient MessageSwitchService msgSwitch;
068:
069: private transient String agentNameForLog;
070:
071: /** Messages we need to send at the end of this epoch. */
072: private transient ArrayList stuffToSend = new ArrayList();
073:
074: /** Tracks the sequence numbers of other agents */
075: private HashMap agentInfo = new HashMap(13);
076:
077: /** Something has happened during this epoch. */
078: private transient boolean needAdvanceEpoch = false;
079:
080: /** The retransmitter thread */
081: private transient Retransmitter retransmitter;
082:
083: /** The acknowledgement sender thread */
084: private transient AcknowledgementSender ackSender;
085:
086: /** The keep alive sender thread */
087: private transient KeepAliveSender keepAliveSender;
088:
089: /** Debug logging */
090: private transient PrintWriter logWriter = null;
091:
092: /** The format of timestamps in the log */
093: private static DateFormat logTimeFormat = new SimpleDateFormat(
094: "yyyy/MM/dd HH:mm:ss.SSS");
095:
096: /**
097: * Inner static class to track the state of communication with
098: * another agent.
099: */
100: private class AgentInfo implements java.io.Serializable {
101: /** The MessageAddress of the remote agent */
102: private MessageAddress agentIdentifier;
103:
104: private long remoteIncarnationNumber = 0L;
105:
106: private long localIncarnationNumber = System
107: .currentTimeMillis();
108:
109: private transient boolean restarted = false;
110:
111: public AgentInfo(MessageAddress cid) {
112: agentIdentifier = cid;
113: }
114:
115: public MessageAddress getMessageAddress() {
116: return agentIdentifier;
117: }
118:
119: /**
120: * The last sequence number we transmited to the agent described
121: * by this AgentInfo
122: */
123: private int currentTransmitSequenceNumber = 0;
124:
125: /**
126: * The next sequence number we expect to receive from the agent
127: * described by this AgentInfo
128: */
129: private int currentReceiveSequenceNumber = 0;
130:
131: /**
132: * The record of messages that have been acknowledged. We acknowledge the highest
133:
134: /**
135: * The queue of messages that are outstanding.
136: */
137: private TreeSet outstandingMessages = new TreeSet();
138:
139: public void addOutstandingMessage(TimestampedMessage tsm) {
140: outstandingMessages.add(tsm);
141: needAdvanceEpoch = true;
142: }
143:
144: public TimestampedMessage[] getOutstandingMessages() {
145: return (TimestampedMessage[]) outstandingMessages
146: .toArray(new TimestampedMessage[outstandingMessages
147: .size()]);
148: }
149:
150: public synchronized TimestampedMessage getFirstOutstandingMessage() {
151: if (outstandingMessages.isEmpty())
152: return null;
153: return (TimestampedMessage) outstandingMessages.first();
154: }
155:
156: /** Which messages have we actually processed (need acks) */
157: private AckSet ackSet = new AckSet(1);
158:
159: private boolean needSendAcknowledgement = false;
160:
161: private transient long transmissionTime = 0;
162:
163: public synchronized long getTransmissionTime() {
164: return transmissionTime;
165: }
166:
167: public synchronized void setTransmissionTime(long now) {
168: transmissionTime = now;
169: }
170:
171: public void acknowledgeMessage(ClusterMessage aMessage) {
172: ackSet.set(aMessage.getContentsId());
173: needAdvanceEpoch = true;
174: }
175:
176: public boolean needSendAcknowledgement() {
177: return needSendAcknowledgement;
178: }
179:
180: public void setNeedSendAcknowledgment() {
181: needSendAcknowledgement = true;
182: ackSender.poke();
183: }
184:
185: public boolean getRestarted() {
186: return restarted;
187: }
188:
189: public void setRestarted(boolean newRestarted) {
190: restarted = newRestarted;
191: }
192:
193: public void advance() {
194: int oldMin = ackSet.getMinSequence();
195: if (ackSet.advance() > oldMin) {
196: needAdvanceEpoch = true; // State has change, need to persist
197: setNeedSendAcknowledgment(); // Also need to send an ack
198: }
199: }
200:
201: /**
202: * Create an acknowledgement for the current min sequence of the
203: * ackset.
204: */
205: public AckDirectiveMessage getAcknowledgement() {
206: int firstZero = ackSet.getMinSequence();
207: AckDirectiveMessage ack = new AckDirectiveMessage(
208: getMessageAddress(), self, firstZero - 1,
209: remoteIncarnationNumber);
210: needSendAcknowledgement = false;
211: return ack;
212: }
213:
214: public void receiveAck(MessageManagerImpl mm, int sequence,
215: boolean isRestart) {
216: long now = System.currentTimeMillis();
217: for (Iterator messages = outstandingMessages.iterator(); messages
218: .hasNext();) {
219: TimestampedMessage tsm = (TimestampedMessage) messages
220: .next();
221: if (tsm.getSequenceNumber() <= sequence) {
222: mm.printMessage("Remv", tsm);
223: messages.remove();
224: } else if (isRestart) {
225: tsm.setTimestamp(now); // Retransmit this ASAP
226: } else {
227: break; // Nothing left to do
228: }
229: }
230: }
231:
232: /**
233: * Check that the given message has the right sequence number.
234: * @return a code indicating whether the message is old, current,
235: * or future.
236: */
237: public int checkReceiveSequenceNumber(DirectiveMessage aMessage) {
238: int seq = aMessage.getContentsId();
239: if (remoteIncarnationNumber == 0L)
240: return FUTURE;
241: if (seq <= currentReceiveSequenceNumber)
242: return DUPLICATE;
243: if (seq > currentReceiveSequenceNumber + 1)
244: return FUTURE;
245: return PRESENT;
246: }
247:
248: public long getLocalIncarnationNumber() {
249: return localIncarnationNumber;
250: }
251:
252: public long getRemoteIncarnationNumber() {
253: return remoteIncarnationNumber;
254: }
255:
256: public void setRemoteIncarnationNumber(long incarnationNumber) {
257: remoteIncarnationNumber = incarnationNumber;
258: }
259:
260: public int getCurrentTransmitSequenceNumber() {
261: return currentTransmitSequenceNumber;
262: }
263:
264: public int getNextTransmitSequenceNumber() {
265: return ++currentTransmitSequenceNumber;
266: }
267:
268: public int getCurrentReceiveSequenceNumber() {
269: return currentReceiveSequenceNumber;
270: }
271:
272: /**
273: * Update the current receive sequence number of this other agent.
274: * @param seqno the new current sequence number of this other
275: * agent.
276: */
277: public void updateReceiveSequenceNumber(int seqno) {
278: currentReceiveSequenceNumber = seqno;
279: needAdvanceEpoch = true; // Our state changed need to persist
280: }
281:
282: public String toString() {
283: return "AgentInfo " + agentIdentifier + " "
284: + incarnationToString(localIncarnationNumber)
285: + "->"
286: + incarnationToString(remoteIncarnationNumber);
287: }
288: }
289:
290: /**
291: * Tag a message on a retransmit queue with the time at which it
292: * should next be sent.
293: */
294: private class TimestampedMessage implements Comparable,
295: java.io.Serializable {
296: protected transient long timestamp = System.currentTimeMillis();
297: private transient int nTries = 0;
298:
299: protected transient DirectiveMessage theMessage;
300:
301: private MessageAddress theDestination;
302: private int theSequenceNumber;
303: private long theIncarnationNumber;
304: private Directive[] theDirectives;
305: private AgentInfo info;
306:
307: TimestampedMessage(AgentInfo info, DirectiveMessage aMsg) {
308: this .info = info;
309: theMessage = aMsg;
310: theDestination = aMsg.getDestination();
311: theSequenceNumber = aMsg.getContentsId();
312: theIncarnationNumber = aMsg.getIncarnationNumber();
313: theDirectives = aMsg.getDirectives();
314: }
315:
316: public void send(long now) {
317: msgSwitch.sendMessage(getMessage());
318: long nextRetransmission = now
319: + retransmitSchedule[Math.min(nTries++,
320: retransmitSchedule.length - 1)];
321: setTimestamp(nextRetransmission);
322: }
323:
324: /**
325: * Get the DirectiveMessage. If theMessage is null, create a new
326: * one. theMessage will be null only after rehydration of the
327: * message manager.
328: */
329: public DirectiveMessage getMessage() {
330: if (theMessage == null) {
331: theMessage = new DirectiveMessage(getSource(),
332: getDestination(), theIncarnationNumber,
333: getDirectives());
334: theMessage.setContentsId(theSequenceNumber);
335: }
336: theMessage.setAllMessagesAcknowledged(info
337: .getFirstOutstandingMessage() == this );
338: return theMessage;
339: }
340:
341: public MessageAddress getDestination() {
342: return theDestination;
343: }
344:
345: public MessageAddress getSource() {
346: return self;
347: }
348:
349: public int getSequenceNumber() {
350: return theSequenceNumber;
351: }
352:
353: public long getIncarnationNumber() {
354: return theIncarnationNumber;
355: }
356:
357: public Directive[] getDirectives() {
358: return theDirectives;
359: }
360:
361: public void setTimestamp(long ts) {
362: timestamp = ts;
363: }
364:
365: public int compareTo(Object other) {
366: TimestampedMessage otherMsg = (TimestampedMessage) other;
367: return this .theSequenceNumber - otherMsg.theSequenceNumber;
368: }
369:
370: public String toString() {
371: StringBuffer buf = new StringBuffer();
372: buf.append("seq(");
373: buf.append(theIncarnationNumber);
374: buf.append("/");
375: buf.append(theSequenceNumber);
376: buf.append(") ");
377: StringUtility.appendArray(buf, theDirectives);
378: return buf.substring(0);
379: }
380: }
381:
382: private static long[] retransmitSchedule = { 20000L, 20000L,
383: 60000L, 120000L, 300000L };
384:
385: public MessageManagerImpl(boolean enable) {
386: USE_MESSAGE_MANAGER = enable;
387: }
388:
389: public void start(MessageSwitchService msgSwitch,
390: boolean didRehydrate) {
391: self = msgSwitch.getMessageAddress();
392: this .msgSwitch = msgSwitch;
393: String agentName = self.getAddress();
394: agentNameForLog = " ".substring(Math.min(14,
395: agentName.length()))
396: + agentName + " ";
397: if (debug) {
398: try {
399: logWriter = new PrintWriter(new FileWriter(
400: "MessageManager_" + agentName + ".log",
401: true || didRehydrate));
402: printLog("MessageManager Started");
403: } catch (IOException e) {
404: System.err
405: .println("Can't open MessageManager log file: "
406: + e);
407: }
408: }
409:
410: if (USE_MESSAGE_MANAGER) {
411: retransmitter = new Retransmitter(agentName);
412: retransmitter.start();
413: ackSender = new AcknowledgementSender(agentName);
414: ackSender.start();
415: keepAliveSender = new KeepAliveSender(agentName);
416: keepAliveSender.start();
417: }
418: }
419:
420: public void stop() {
421: if (USE_MESSAGE_MANAGER) {
422: // TODO postponed until needed
423: System.err
424: .println("\nFIXME MessageManager \"stop()\" for (USE_MESSAGE_MANAGER == true) "
425: + "should halt internal threads");
426: }
427: }
428:
429: private synchronized void sendKeepAlive() {
430: ArrayList messages = new ArrayList(agentInfo.size());
431: Directive[] directives = new Directive[0];
432: long now = System.currentTimeMillis();
433: for (Iterator agents = agentInfo.values().iterator(); agents
434: .hasNext();) {
435: AgentInfo info = (AgentInfo) agents.next();
436: if (info.getFirstOutstandingMessage() == null) {
437: if (now > info.getTransmissionTime()
438: + KEEP_ALIVE_INTERVAL) {
439: DirectiveMessage ndm = new DirectiveMessage(self,
440: info.getMessageAddress(), info
441: .getLocalIncarnationNumber(),
442: directives);
443: messages.add(ndm);
444: }
445: }
446: }
447: sendMessages(messages.iterator());
448: }
449:
450: private void printMessage(String prefix, DirectiveMessage aMessage) {
451: printMessage(prefix, aMessage.getIncarnationNumber(), aMessage
452: .getContentsId(), aMessage.getSource().getAddress(),
453: aMessage.getDestination().getAddress(), (aMessage
454: .areAllMessagesAcknowledged() ? " yes" : " no")
455: + StringUtility.arrayToString(aMessage
456: .getDirectives()));
457: }
458:
459: private void printMessage(String prefix,
460: AckDirectiveMessage aMessage) {
461: printMessage(prefix, aMessage.getIncarnationNumber(), aMessage
462: .getContentsId(), aMessage.getSource().getAddress(),
463: aMessage.getDestination().getAddress(), "");
464: }
465:
466: private void printMessage(String prefix, TimestampedMessage tsm) {
467: printMessage(prefix, tsm.getIncarnationNumber(), tsm
468: .getSequenceNumber(), tsm.getSource().getAddress(), tsm
469: .getDestination().getAddress(), " ???"
470: + StringUtility.arrayToString(tsm.getDirectives()));
471: }
472:
473: private Date tDate = new Date();
474: private SimpleDateFormat incarnationFormat = new SimpleDateFormat(
475: "yyyy/MM/dd/hh:mm:ss.SSS");
476:
477: private String incarnationToString(long l) {
478: if (l == 0L)
479: return "<none>";
480: tDate.setTime(l);
481: return incarnationFormat.format(tDate);
482: }
483:
484: private void printMessage(String prefix, long incarnationNumber,
485: int sequence, String from, String to, String contents) {
486: tDate.setTime(incarnationNumber);
487: String msg = prefix + " " + sequence + " " + from + "->" + to
488: + " (" + incarnationFormat.format(tDate) + "): "
489: + contents;
490: // System.out.println(msg);
491: if (logWriter != null) {
492: printLog(msg);
493: }
494: }
495:
496: private void printLog(String msg) {
497: logWriter.print(logTimeFormat.format(new Date(System
498: .currentTimeMillis())));
499: logWriter.print(agentNameForLog);
500: logWriter.println(msg);
501: logWriter.flush();
502: }
503:
504: /**
505: * Submit a DirectiveMessage for transmission from this agent. The
506: * message is added to the set of message to be transmitted at the
507: * end of the current epoch.
508: */
509: public void sendMessages(Iterator messages) {
510: if (USE_MESSAGE_MANAGER) {
511: synchronized (this ) {
512: while (messages.hasNext()) {
513: DirectiveMessage aMessage = (DirectiveMessage) messages
514: .next();
515: AgentInfo info = getAgentInfo(aMessage
516: .getDestination());
517: if (info == null) {
518: if (debug)
519: printLog("sendMessage createNewConnection");
520: info = createNewConnection(aMessage
521: .getDestination(), 0L);
522: }
523: aMessage.setIncarnationNumber(info
524: .getLocalIncarnationNumber());
525: aMessage.setContentsId(info
526: .getNextTransmitSequenceNumber());
527: stuffToSend.add(new TimestampedMessage(info,
528: aMessage));
529: if (debug)
530: printMessage("QSnd", aMessage);
531: }
532: needAdvanceEpoch = true;
533: }
534: } else {
535: while (messages.hasNext()) {
536: msgSwitch.sendMessage((DirectiveMessage) messages
537: .next());
538: }
539: }
540: }
541:
542: private Directive[] emptyDirectives = new Directive[0];
543:
544: private void sendInitializeMessage(AgentInfo info) {
545: DirectiveMessage msg = new DirectiveMessage(self, info
546: .getMessageAddress(), info.getLocalIncarnationNumber(),
547: emptyDirectives);
548: msg.setContentsId(info.getNextTransmitSequenceNumber());
549: stuffToSend.add(new TimestampedMessage(info, msg));
550: if (debug)
551: printMessage("QSnd", msg);
552: needAdvanceEpoch = true;
553: }
554:
555: private AgentInfo getAgentInfo(MessageAddress agentIdentifier) {
556: return (AgentInfo) agentInfo.get(agentIdentifier);
557: }
558:
559: private AgentInfo createAgentInfo(MessageAddress agentIdentifier) {
560: AgentInfo info = new AgentInfo(agentIdentifier);
561: agentInfo.put(agentIdentifier, info);
562: return info;
563: }
564:
565: /**
566: * Check a received DirectiveMessage for being a duplicate.
567: * @param aMessage The received DirectiveMessage
568: * @return DUPLICATE, FUTURE, RESTART, IGNORE, or OK
569: */
570: public int receiveMessage(DirectiveMessage directiveMessage) {
571: if (!USE_MESSAGE_MANAGER)
572: return OK;
573: synchronized (this ) {
574: boolean restarted = false;
575: MessageAddress sourceIdentifier = directiveMessage
576: .getSource();
577: AgentInfo info = getAgentInfo(sourceIdentifier);
578: boolean isFirst = directiveMessage.getContentsId() == 1;
579: if (info != null) {
580: if (info.getRestarted()) {
581: restarted = true;
582: info.setRestarted(false);
583: }
584: long infoIncarnation = info
585: .getRemoteIncarnationNumber();
586: long messageIncarnation = directiveMessage
587: .getIncarnationNumber();
588: if (infoIncarnation != messageIncarnation) {
589: if (infoIncarnation == 0L) {
590: if (isFirst) {
591: info
592: .setRemoteIncarnationNumber(messageIncarnation);
593: } else {
594: if (debug)
595: printMessage("Nnz1", directiveMessage);
596: info.setNeedSendAcknowledgment();
597: return restarted ? (IGNORE | RESTART)
598: : IGNORE; // Stray message
599: }
600: } else if (messageIncarnation < infoIncarnation) {
601: if (debug)
602: printMessage("Prev", directiveMessage);
603: info.setNeedSendAcknowledgment();
604: // Message from previous incarnation of remote agent
605: return restarted ? (IGNORE | RESTART) : IGNORE;
606: } else if (messageIncarnation > infoIncarnation) {
607: // Message from new incarnation
608: if (isFirst) { // Synchronize to new incarnation
609: if (debug)
610: printLog("receiveMessage messageIncarnation > infoIncarnation");
611: info = createNewConnection(
612: sourceIdentifier, directiveMessage
613: .getIncarnationNumber());
614: restarted = true;
615: } else {
616: if (debug)
617: printMessage("Nnz2", directiveMessage);
618: info.setNeedSendAcknowledgment();
619: // Apparently new incarnation, but not sequence 0
620: return restarted ? (IGNORE | RESTART)
621: : IGNORE;
622: }
623: }
624: }
625: } else {
626: if (isFirst) {
627: if (debug)
628: printLog("receiveMessage null info is first");
629: info = createNewConnection(sourceIdentifier,
630: directiveMessage.getIncarnationNumber());
631: } else {
632: if (debug)
633: printMessage(
634: "receiveMessage null info not first",
635: directiveMessage);
636: info = createNewConnection(sourceIdentifier, 0L);
637: return IGNORE; // Must have sequence zero to synchronize
638: }
639: }
640: switch (info.checkReceiveSequenceNumber(directiveMessage)) {
641: case DUPLICATE:
642: if (debug)
643: printMessage("Dupl", directiveMessage);
644: info.setNeedSendAcknowledgment();
645: return IGNORE;
646: default:
647: case FUTURE:
648: if (directiveMessage.areAllMessagesAcknowledged()) {
649: // We are out of sync
650: if (debug)
651: printLog("receiveMessage from future all acked");
652: info = createNewConnection(sourceIdentifier, 0L);
653: return IGNORE | RESTART;
654: }
655: if (debug)
656: printMessage("Futr", directiveMessage);
657: return IGNORE; // Message out of order; ignore it
658: case OK:
659: if (debug)
660: printMessage("Rcvd", directiveMessage);
661: info.updateReceiveSequenceNumber(directiveMessage
662: .getContentsId());
663: needAdvanceEpoch = true;
664: return restarted ? (RESTART | OK) : OK;
665: }
666: }
667: }
668:
669: private AgentInfo createNewConnection(
670: MessageAddress sourceIdentifier,
671: long remoteIncarnationNumber) {
672: AgentInfo info = createAgentInfo(sourceIdentifier); // New connection
673: info.setRemoteIncarnationNumber(remoteIncarnationNumber);
674: if (debug)
675: printLog("New Connection: " + info.toString());
676: sendInitializeMessage(info);
677: return info;
678: }
679:
680: public void acknowledgeMessages(Iterator messages) {
681: if (!USE_MESSAGE_MANAGER)
682: return;
683: synchronized (this ) {
684: while (messages.hasNext()) {
685: DirectiveMessage aMessage = (DirectiveMessage) messages
686: .next();
687: if (aMessage.getContentsId() == 0)
688: return; // Not reliably sent
689: AgentInfo info = getAgentInfo(aMessage.getSource());
690: info.acknowledgeMessage(aMessage);
691: needAdvanceEpoch = true;
692: if (debug)
693: printMessage("QAck", aMessage);
694: }
695: }
696: }
697:
698: /**
699: * Process a directive acknowledgement. The acknowledged messages
700: * are removed from the retransmission queues. If the ack is marked
701: * as having been sent during a agent restart, we speed up the
702: * retransmission process to hasten the recovery process.
703: */
704: public int receiveAck(AckDirectiveMessage theAck) {
705: synchronized (this ) {
706: if (debug)
707: printMessage("RAck", theAck);
708: AgentInfo info = getAgentInfo(theAck.getSource());
709: if (info != null) {
710: boolean restarted = false;
711: if (info.getRestarted()) {
712: info.setRestarted(false);
713: restarted = true;
714: }
715: long localIncarnationNumber = info
716: .getLocalIncarnationNumber();
717: long ackIncarnationNumber = theAck
718: .getIncarnationNumber();
719: if (localIncarnationNumber == ackIncarnationNumber) {
720: int seq = theAck.getContentsId();
721: if (info.getCurrentTransmitSequenceNumber() < seq) {
722: if (debug)
723: printLog("receiveAck from future same incarnation");
724: createNewConnection(info.getMessageAddress(),
725: 0L);
726: return RESTART;
727: }
728: info.receiveAck(this , seq, false);
729: return restarted ? (RESTART | OK) : OK;
730: } else if (localIncarnationNumber < ackIncarnationNumber) {
731: // We are living in the past. We must have rehydrated with
732: // an old set of connections.
733: if (debug)
734: printLog("receiveAck from future incarnation");
735: createNewConnection(info.getMessageAddress(), 0L);
736: return RESTART;
737: } else {
738: // The other end is living in the past. Hopefully, he will
739: // eventually get with the program.
740: if (debug)
741: printLog("receiveAck from past incarnation");
742: return restarted ? (IGNORE | RESTART) : IGNORE;
743: }
744: } else {
745: return IGNORE;
746: }
747: }
748: }
749:
750: /**
751: * Determine if anything has happened during this epoch.
752: * @return true if anything has changed.
753: */
754: public boolean needAdvanceEpoch() {
755: return needAdvanceEpoch;
756: }
757:
758: /**
759: * Wrap up the current epoch and get into the correct state to be
760: * persisted. Every message that has been queued for transmission is
761: * sent. Acknowledgement numbers are advanced so we begin
762: * acknowledging messages we have received and processed. This
763: * method must be called while this MessageManager is
764: * synchronized. We purposely omit the "synchronized" here because
765: * proper operation is precluded unless the synchronization is
766: * performed externally.
767: */
768: public void advanceEpoch() {
769: if (!USE_MESSAGE_MANAGER)
770: return;
771: // Advance the information about every other agent
772: for (Iterator agents = agentInfo.values().iterator(); agents
773: .hasNext();) {
774: AgentInfo info = (AgentInfo) agents.next();
775: info.advance();
776: }
777: needAdvanceEpoch = false;
778: for (Iterator iter = stuffToSend.iterator(); iter.hasNext();) {
779: TimestampedMessage tsm = (TimestampedMessage) iter.next();
780: getAgentInfo(tsm.getDestination()).addOutstandingMessage(
781: tsm);
782: retransmitter.poke();
783: }
784: stuffToSend.clear();
785: if (logWriter != null) {
786: printLog("Advanced epoch");
787: }
788: }
789:
790: private class KeepAliveSender extends Thread {
791: public KeepAliveSender(String agentName) {
792: super ("Keep Alive Sender/" + agentName);
793: }
794:
795: public void run() {
796: while (true) {
797: sendKeepAlive();
798: try {
799: sleep(KEEP_ALIVE_INTERVAL);
800: } catch (InterruptedException ie) {
801: }
802: }
803: }
804: }
805:
806: private class AcknowledgementSender extends Thread {
807: private boolean poked = false;
808: ArrayList acksToSend = new ArrayList();
809:
810: public AcknowledgementSender(String agentName) {
811: super ("Ack Sender/" + agentName);
812: }
813:
814: public synchronized void poke() {
815: poked = true;
816: AcknowledgementSender.this .notify();
817: }
818:
819: public void run() {
820: while (true) {
821: synchronized (AcknowledgementSender.this ) {
822: while (!poked) {
823: try {
824: AcknowledgementSender.this .wait();
825: } catch (InterruptedException ie) {
826: }
827: }
828: poked = false;
829: }
830: synchronized (MessageManagerImpl.this ) {
831: for (Iterator agents = agentInfo.values()
832: .iterator(); agents.hasNext();) {
833: AgentInfo info = (AgentInfo) agents.next();
834: if (info.needSendAcknowledgement()) {
835: acksToSend.add(info.getAcknowledgement());
836: }
837: }
838: }
839: for (Iterator iter = acksToSend.iterator(); iter
840: .hasNext();) {
841: AckDirectiveMessage ack = (AckDirectiveMessage) iter
842: .next();
843: if (debug)
844: printMessage("SAck", ack);
845: msgSwitch.sendMessage(ack);
846: }
847: acksToSend.clear();
848: }
849: }
850: }
851:
852: private class Retransmitter extends Thread {
853: private boolean poked = false;
854: private ArrayList messagesToRetransmit = new ArrayList();
855:
856: public Retransmitter(String agentName) {
857: super (agentName + "/Message Manager");
858: }
859:
860: public synchronized void poke() {
861: poked = true;
862: Retransmitter.this .notify();
863: }
864:
865: /**
866: * Retransmit messages that have not been acknowledged. Iterate
867: * through all the agents for which we have AgentInfo and
868: * interate through all the outstandmessage that have been sent to
869: * that agent. Check the time to retransmit of the message and if
870: * the current time has passed that time, then retransmit the
871: * message. Keep the earliest time of any message that is not ready
872: * to be retransmitted and sleep long enough so that there could be
873: * at least one message to retransmit when we awaken.
874: */
875: public void run() {
876: while (true) {
877: try {
878: long now = System.currentTimeMillis();
879: long earliestTime = now + retransmitSchedule[0];
880: synchronized (MessageManagerImpl.this ) {
881: for (Iterator agents = agentInfo.values()
882: .iterator(); agents.hasNext();) {
883: AgentInfo info = (AgentInfo) agents.next();
884: TimestampedMessage tsm = info
885: .getFirstOutstandingMessage();
886: if (tsm == null)
887: continue;
888: if (tsm.timestamp <= now) {
889: TimestampedMessage[] messages = info
890: .getOutstandingMessages();
891: info.setTransmissionTime(now);
892: messagesToRetransmit
893: .addAll(java.util.Arrays
894: .asList(messages));
895: } else if (tsm.timestamp < earliestTime) {
896: earliestTime = tsm.timestamp;
897: }
898: }
899: }
900: if (!messagesToRetransmit.isEmpty()) {
901: for (Iterator iter = messagesToRetransmit
902: .iterator(); iter.hasNext();) {
903: TimestampedMessage tsm = (TimestampedMessage) iter
904: .next();
905: tsm.send(now);
906: if (tsm.timestamp < earliestTime) {
907: earliestTime = tsm.timestamp;
908: }
909: if (debug)
910: printMessage(tsm.nTries == 1 ? "Send"
911: : ("Rxm" + tsm.nTries), tsm);
912: }
913: messagesToRetransmit.clear();
914: }
915: synchronized (Retransmitter.this ) {
916: if (!poked) {
917: long sleepTime = 5000L + earliestTime - now;
918: if (sleepTime > 30000L)
919: sleepTime = 30000L;
920: Retransmitter.this .wait(sleepTime);
921: }
922: poked = false;
923: }
924: } catch (Exception e) {
925: e.printStackTrace();
926: }
927: }
928: }
929: }
930:
931: /** Serialize ourselves. Used for persistence. */
932: private void writeObject(ObjectOutputStream os) throws IOException {
933: synchronized (this ) {
934: if (stuffToSend.size() > 0) {
935: throw new IOException("Non-empty stuffToSend");
936: }
937: os.defaultWriteObject();
938: }
939: }
940:
941: private void readObject(ObjectInputStream is) throws IOException,
942: ClassNotFoundException {
943: is.defaultReadObject();
944: stuffToSend = new ArrayList();
945: needAdvanceEpoch = false;
946: // for (Iterator agents = agentInfo.values().iterator(); agents.hasNext(); ) {
947: // AgentInfo info = (AgentInfo) agents.next();
948: // info.setRestarted(true);
949: // }
950: }
951: }
|