001: // $Id: Digest.java,v 1.19 2006/05/02 09:03:28 belaban Exp $
002:
003: package org.jgroups.protocols.pbcast;
004:
005: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.Address;
009: import org.jgroups.Global;
010: import org.jgroups.util.Streamable;
011: import org.jgroups.util.Util;
012:
013: import java.io.*;
014: import java.util.Iterator;
015: import java.util.Map;
016: import java.util.Set;
017:
018: /**
019: * A message digest, which is used by the PBCAST layer for gossiping (also used by NAKACK for
020: * keeping track of current seqnos for all members). It contains pairs of senders and a range of seqnos
021: * (low and high), where each sender is associated with its highest and lowest seqnos seen so far. That
022: * is, the lowest seqno which was not yet garbage-collected and the highest that was seen so far and is
023: * deliverable (or was already delivered) to the application. A range of [0 - 0] means no messages have
024: * been received yet.
025: * <p> April 3 2001 (bela): Added high_seqnos_seen member. It is used to disseminate
026: * information about the last (highest) message M received from a sender P. Since we might be using a
027: * negative acknowledgment message numbering scheme, we would never know if the last message was
028: * lost. Therefore we periodically gossip and include the last message seqno. Members who haven't seen
029: * it (e.g. because msg was dropped) will request a retransmission. See DESIGN for details.
030: * @author Bela Ban
031: */
032: public class Digest implements Externalizable, Streamable {
033: /** Map<Address, Entry> */
034: Map senders = null;
035: protected static final Log log = LogFactory.getLog(Digest.class);
036: static final boolean warn = log.isWarnEnabled();
037:
038: public Digest() {
039: } // used for externalization
040:
041: public Digest(int size) {
042: senders = createSenders(size);
043: }
044:
045: public boolean equals(Object obj) {
046: if (obj == null)
047: return false;
048: Digest other = (Digest) obj;
049: if (senders == null && other.senders == null)
050: return true;
051: return senders.equals(other.senders);
052: }
053:
054: public void add(Address sender, long low_seqno, long high_seqno) {
055: add(sender, low_seqno, high_seqno, -1);
056: }
057:
058: public void add(Address sender, long low_seqno, long high_seqno,
059: long high_seqno_seen) {
060: add(sender, new Entry(low_seqno, high_seqno, high_seqno_seen));
061: }
062:
063: private void add(Address sender, Entry entry) {
064: if (sender == null || entry == null) {
065: if (log.isErrorEnabled())
066: log.error("sender (" + sender + ") or entry (" + entry
067: + ")is null, will not add entry");
068: return;
069: }
070: Object retval = senders.put(sender, entry);
071: if (retval != null && warn)
072: log.warn("entry for " + sender + " was overwritten with "
073: + entry);
074: }
075:
076: public void add(Digest d) {
077: if (d != null) {
078: Map.Entry entry;
079: Address key;
080: Entry val;
081: for (Iterator it = d.senders.entrySet().iterator(); it
082: .hasNext();) {
083: entry = (Map.Entry) it.next();
084: key = (Address) entry.getKey();
085: val = (Entry) entry.getValue();
086: add(key, val.low_seqno, val.high_seqno,
087: val.high_seqno_seen);
088: }
089: }
090: }
091:
092: public void replace(Digest d) {
093: if (d != null) {
094: Map.Entry entry;
095: Address key;
096: Entry val;
097: clear();
098: for (Iterator it = d.senders.entrySet().iterator(); it
099: .hasNext();) {
100: entry = (Map.Entry) it.next();
101: key = (Address) entry.getKey();
102: val = (Entry) entry.getValue();
103: add(key, val.low_seqno, val.high_seqno,
104: val.high_seqno_seen);
105: }
106: }
107: }
108:
109: public Entry get(Address sender) {
110: return (Entry) senders.get(sender);
111: }
112:
113: public boolean set(Address sender, long low_seqno, long high_seqno,
114: long high_seqno_seen) {
115: Entry entry = (Entry) senders.get(sender);
116: if (entry == null)
117: return false;
118: entry.low_seqno = low_seqno;
119: entry.high_seqno = high_seqno;
120: entry.high_seqno_seen = high_seqno_seen;
121: return true;
122: }
123:
124: /**
125: * Adds a digest to this digest. This digest must have enough space to add the other digest; otherwise an error
126: * message will be written. For each sender in the other digest, the merge() method will be called.
127: */
128: public void merge(Digest d) {
129: if (d == null) {
130: if (log.isErrorEnabled())
131: log.error("digest to be merged with is null");
132: return;
133: }
134: Map.Entry entry;
135: Address sender;
136: Entry val;
137: for (Iterator it = d.senders.entrySet().iterator(); it
138: .hasNext();) {
139: entry = (Map.Entry) it.next();
140: sender = (Address) entry.getKey();
141: val = (Entry) entry.getValue();
142: if (val != null) {
143: merge(sender, val.low_seqno, val.high_seqno,
144: val.high_seqno_seen);
145: }
146: }
147: }
148:
149: /**
150: * Similar to add(), but if the sender already exists, its seqnos will be modified (no new entry) as follows:
151: * <ol>
152: * <li>this.low_seqno=min(this.low_seqno, low_seqno)
153: * <li>this.high_seqno=max(this.high_seqno, high_seqno)
154: * <li>this.high_seqno_seen=max(this.high_seqno_seen, high_seqno_seen)
155: * </ol>
156: * If the sender doesn not exist, a new entry will be added (provided there is enough space)
157: */
158: public void merge(Address sender, long low_seqno, long high_seqno,
159: long high_seqno_seen) {
160: if (sender == null) {
161: if (log.isErrorEnabled())
162: log.error("sender == null");
163: return;
164: }
165: Entry entry = (Entry) senders.get(sender);
166: if (entry == null) {
167: add(sender, low_seqno, high_seqno, high_seqno_seen);
168: } else {
169: if (low_seqno < entry.low_seqno)
170: entry.low_seqno = low_seqno;
171: if (high_seqno > entry.high_seqno)
172: entry.high_seqno = high_seqno;
173: if (high_seqno_seen > entry.high_seqno_seen)
174: entry.high_seqno_seen = high_seqno_seen;
175: }
176: }
177:
178: public boolean contains(Address sender) {
179: return senders.containsKey(sender);
180: }
181:
182: /**
183: * Compares two digests and returns true if the senders are the same, otherwise false.
184: * @param other
185: * @return True if senders are the same, otherwise false.
186: */
187: public boolean sameSenders(Digest other) {
188: if (other == null)
189: return false;
190: if (this .senders == null || other.senders == null)
191: return false;
192: if (this .senders.size() != other.senders.size())
193: return false;
194:
195: Set my_senders = senders.keySet(), other_senders = other.senders
196: .keySet();
197: return my_senders.equals(other_senders);
198: }
199:
200: /**
201: * Increments the sender's high_seqno by 1.
202: */
203: public void incrementHighSeqno(Address sender) {
204: Entry entry = (Entry) senders.get(sender);
205: if (entry == null)
206: return;
207: entry.high_seqno++;
208: }
209:
210: public int size() {
211: return senders.size();
212: }
213:
214: /**
215: * Resets the seqnos for the sender at 'index' to 0. This happens when a member has left the group,
216: * but it is still in the digest. Resetting its seqnos ensures that no-one will request a message
217: * retransmission from the dead member.
218: */
219: public void resetAt(Address sender) {
220: Entry entry = (Entry) senders.get(sender);
221: if (entry != null)
222: entry.reset();
223: }
224:
225: public void clear() {
226: senders.clear();
227: }
228:
229: public long lowSeqnoAt(Address sender) {
230: Entry entry = (Entry) senders.get(sender);
231: if (entry == null)
232: return -1;
233: else
234: return entry.low_seqno;
235: }
236:
237: public long highSeqnoAt(Address sender) {
238: Entry entry = (Entry) senders.get(sender);
239: if (entry == null)
240: return -1;
241: else
242: return entry.high_seqno;
243: }
244:
245: public long highSeqnoSeenAt(Address sender) {
246: Entry entry = (Entry) senders.get(sender);
247: if (entry == null)
248: return -1;
249: else
250: return entry.high_seqno_seen;
251: }
252:
253: public void setHighSeqnoAt(Address sender, long high_seqno) {
254: Entry entry = (Entry) senders.get(sender);
255: if (entry != null)
256: entry.high_seqno = high_seqno;
257: }
258:
259: public void setHighSeqnoSeenAt(Address sender, long high_seqno_seen) {
260: Entry entry = (Entry) senders.get(sender);
261: if (entry != null)
262: entry.high_seqno_seen = high_seqno_seen;
263: }
264:
265: public void setHighestDeliveredAndSeenSeqnos(Address sender,
266: long high_seqno, long high_seqno_seen) {
267: Entry entry = (Entry) senders.get(sender);
268: if (entry != null) {
269: entry.high_seqno = high_seqno;
270: entry.high_seqno_seen = high_seqno_seen;
271: }
272: }
273:
274: public Digest copy() {
275: Digest ret = new Digest(senders.size());
276: Map.Entry entry;
277: Entry tmp;
278: for (Iterator it = senders.entrySet().iterator(); it.hasNext();) {
279: entry = (Map.Entry) it.next();
280: tmp = (Entry) entry.getValue();
281: ret.add((Address) entry.getKey(), tmp.low_seqno,
282: tmp.high_seqno, tmp.high_seqno_seen);
283: }
284: return ret;
285: }
286:
287: public String toString() {
288: StringBuffer sb = new StringBuffer();
289: boolean first = true;
290: if (senders == null)
291: return "[]";
292: Map.Entry entry;
293: Address key;
294: Entry val;
295:
296: for (Iterator it = senders.entrySet().iterator(); it.hasNext();) {
297: entry = (Map.Entry) it.next();
298: key = (Address) entry.getKey();
299: val = (Entry) entry.getValue();
300: if (!first) {
301: sb.append(", ");
302: } else {
303: first = false;
304: }
305: sb.append(key).append(": ").append('[').append(
306: val.low_seqno).append(" : ");
307: sb.append(val.high_seqno);
308: if (val.high_seqno_seen >= 0)
309: sb.append(" (").append(val.high_seqno_seen).append(")");
310: sb.append("]");
311: }
312: return sb.toString();
313: }
314:
315: public String printHighSeqnos() {
316: StringBuffer sb = new StringBuffer();
317: boolean first = true;
318: Map.Entry entry;
319: Address key;
320: Entry val;
321:
322: for (Iterator it = senders.entrySet().iterator(); it.hasNext();) {
323: entry = (Map.Entry) it.next();
324: key = (Address) entry.getKey();
325: val = (Entry) entry.getValue();
326: if (!first) {
327: sb.append(", ");
328: } else {
329: sb.append('[');
330: first = false;
331: }
332: sb.append(key).append("#").append(val.high_seqno);
333: }
334: sb.append(']');
335: return sb.toString();
336: }
337:
338: public String printHighSeqnosSeen() {
339: StringBuffer sb = new StringBuffer();
340: boolean first = true;
341: Map.Entry entry;
342: Address key;
343: Entry val;
344:
345: for (Iterator it = senders.entrySet().iterator(); it.hasNext();) {
346: entry = (Map.Entry) it.next();
347: key = (Address) entry.getKey();
348: val = (Entry) entry.getValue();
349: if (!first) {
350: sb.append(", ");
351: } else {
352: sb.append('[');
353: first = false;
354: }
355: sb.append(key).append("#").append(val.high_seqno_seen);
356: }
357: sb.append(']');
358: return sb.toString();
359: }
360:
361: public void writeExternal(ObjectOutput out) throws IOException {
362: out.writeObject(senders);
363: }
364:
365: public void readExternal(ObjectInput in) throws IOException,
366: ClassNotFoundException {
367: senders = (Map) in.readObject();
368: }
369:
370: public void writeTo(DataOutputStream out) throws IOException {
371: out.writeShort(senders.size());
372: Map.Entry entry;
373: Address key;
374: Entry val;
375: for (Iterator it = senders.entrySet().iterator(); it.hasNext();) {
376: entry = (Map.Entry) it.next();
377: key = (Address) entry.getKey();
378: val = (Entry) entry.getValue();
379: Util.writeAddress(key, out);
380: out.writeLong(val.low_seqno);
381: out.writeLong(val.high_seqno);
382: out.writeLong(val.high_seqno_seen);
383: }
384: }
385:
386: public void readFrom(DataInputStream in) throws IOException,
387: IllegalAccessException, InstantiationException {
388: short size = in.readShort();
389: senders = createSenders(size);
390: Address key;
391: for (int i = 0; i < size; i++) {
392: key = Util.readAddress(in);
393: add(key, in.readLong(), in.readLong(), in.readLong());
394: }
395: }
396:
397: public long serializedSize() {
398: long retval = Global.SHORT_SIZE; // number of elements in 'senders'
399: if (senders.size() > 0) {
400: Address addr = (Address) senders.keySet().iterator().next();
401: int len = addr.size() + 2 * Global.BYTE_SIZE; // presence byte, IpAddress vs other address
402: len += 3 * Global.LONG_SIZE; // 3 longs in one Entry
403: retval += len * senders.size();
404: }
405: return retval;
406: }
407:
408: private static Map createSenders(int size) {
409: return new ConcurrentReaderHashMap(size);
410: }
411:
412: /**
413: * Class keeping track of the lowest and highest sequence numbers delivered, and the highest
414: * sequence numbers received, per member
415: */
416: public static class Entry implements Externalizable {
417: public long low_seqno, high_seqno, high_seqno_seen = -1;
418:
419: public Entry() {
420: }
421:
422: public Entry(long low_seqno, long high_seqno,
423: long high_seqno_seen) {
424: this .low_seqno = low_seqno;
425: this .high_seqno = high_seqno;
426: this .high_seqno_seen = high_seqno_seen;
427: }
428:
429: public Entry(long low_seqno, long high_seqno) {
430: this .low_seqno = low_seqno;
431: this .high_seqno = high_seqno;
432: }
433:
434: public Entry(Entry other) {
435: if (other != null) {
436: low_seqno = other.low_seqno;
437: high_seqno = other.high_seqno;
438: high_seqno_seen = other.high_seqno_seen;
439: }
440: }
441:
442: public boolean equals(Object obj) {
443: Entry other = (Entry) obj;
444: return low_seqno == other.low_seqno
445: && high_seqno == other.high_seqno
446: && high_seqno_seen == other.high_seqno_seen;
447: }
448:
449: public String toString() {
450: return new StringBuffer("low=").append(low_seqno).append(
451: ", high=").append(high_seqno).append(
452: ", highest seen=").append(high_seqno_seen)
453: .toString();
454: }
455:
456: public void reset() {
457: low_seqno = high_seqno = 0;
458: high_seqno_seen = -1;
459: }
460:
461: public void writeExternal(ObjectOutput out) throws IOException {
462: out.writeLong(low_seqno);
463: out.writeLong(high_seqno);
464: out.writeLong(high_seqno_seen);
465: }
466:
467: public void readExternal(ObjectInput in) throws IOException,
468: ClassNotFoundException {
469: low_seqno = in.readLong();
470: high_seqno = in.readLong();
471: high_seqno_seen = in.readLong();
472: }
473: }
474: }
|