001: // $Id: UNICAST.java,v 1.63.2.4 2007/04/27 08:03:52 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.AckReceiverWindow;
007: import org.jgroups.stack.AckSenderWindow;
008: import org.jgroups.stack.Protocol;
009: import org.jgroups.util.BoundedList;
010: import org.jgroups.util.Streamable;
011: import org.jgroups.util.TimeScheduler;
012: import org.jgroups.util.Util;
013:
014: import java.io.*;
015: import java.util.*;
016:
017: /**
018: * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission
019: * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for
020: * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All
021: * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a
022: * message from a peer for the first time, another entry will be created and added to the hashtable
023: * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This
024: * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
025: * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The
026: * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received
027: * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno
028: * received so far, and keeps messages in order.<p>
029: * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from
030: * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow
031: * whenever a message is received: the new message is added and then we try to remove as many messages as
032: * possible (until we stop at a gap, or there are no more messages).
033: * @author Bela Ban
034: */
035: public class UNICAST extends Protocol implements
036: AckSenderWindow.RetransmitCommand {
037: private final Vector members = new Vector(11);
038: private final HashMap connections = new HashMap(11); // Object (sender or receiver) -- Entries
039: private long[] timeout = { 400, 800, 1600, 3200 }; // for AckSenderWindow: max time to wait for missing acks
040: private Address local_addr = null;
041: private TimeScheduler timer = null; // used for retransmissions (passed to AckSenderWindow)
042:
043: // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false
044: // default is true
045: private boolean use_gms = true;
046: private boolean started = false;
047:
048: /** A list of members who left, used to determine when to prevent sending messages to left mbrs */
049: private final BoundedList previous_members = new BoundedList(50);
050:
051: private final static String name = "UNICAST";
052: private static final long DEFAULT_FIRST_SEQNO = 1;
053:
054: private long num_msgs_sent = 0, num_msgs_received = 0,
055: num_bytes_sent = 0, num_bytes_received = 0;
056: private long num_acks_sent = 0, num_acks_received = 0,
057: num_xmit_requests_received = 0;
058:
059: /** All protocol names have to be unique ! */
060: public String getName() {
061: return name;
062: }
063:
064: public String getLocalAddress() {
065: return local_addr != null ? local_addr.toString() : "null";
066: }
067:
068: public String getMembers() {
069: return members != null ? members.toString() : "[]";
070: }
071:
072: public String printConnections() {
073: StringBuffer sb = new StringBuffer();
074: Map.Entry entry;
075: for (Iterator it = connections.entrySet().iterator(); it
076: .hasNext();) {
077: entry = (Map.Entry) it.next();
078: sb.append(entry.getKey()).append(": ").append(
079: entry.getValue()).append("\n");
080: }
081: return sb.toString();
082: }
083:
084: public long getNumMessagesSent() {
085: return num_msgs_sent;
086: }
087:
088: public long getNumMessagesReceived() {
089: return num_msgs_received;
090: }
091:
092: public long getNumBytesSent() {
093: return num_bytes_sent;
094: }
095:
096: public long getNumBytesReceived() {
097: return num_bytes_received;
098: }
099:
100: public long getNumAcksSent() {
101: return num_acks_sent;
102: }
103:
104: public long getNumAcksReceived() {
105: return num_acks_received;
106: }
107:
108: public long getNumberOfRetransmitRequestsReceived() {
109: return num_xmit_requests_received;
110: }
111:
112: /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */
113: public int getNumberOfUnackedMessages() {
114: int num = 0;
115: Entry entry;
116: synchronized (connections) {
117: for (Iterator it = connections.values().iterator(); it
118: .hasNext();) {
119: entry = (Entry) it.next();
120: if (entry.sent_msgs != null)
121: num += entry.sent_msgs.size();
122: }
123: }
124: return num;
125: }
126:
127: public String getUnackedMessages() {
128: StringBuffer sb = new StringBuffer();
129: Map.Entry entry;
130: Entry e;
131: Object member;
132: synchronized (connections) {
133: for (Iterator it = connections.entrySet().iterator(); it
134: .hasNext();) {
135: entry = (Map.Entry) it.next();
136: member = entry.getKey();
137: e = (Entry) entry.getValue();
138: sb.append(member).append(": ");
139: if (e.sent_msgs != null)
140: sb.append(e.sent_msgs.toString()).append("\n");
141: }
142: }
143: return sb.toString();
144: }
145:
146: public int getNumberOfMessagesInReceiveWindows() {
147: int num = 0;
148: Entry entry;
149: synchronized (connections) {
150: for (Iterator it = connections.values().iterator(); it
151: .hasNext();) {
152: entry = (Entry) it.next();
153: if (entry.received_msgs != null)
154: num += entry.received_msgs.size();
155: }
156: }
157: return num;
158: }
159:
160: public void resetStats() {
161: num_msgs_sent = num_msgs_received = num_bytes_sent = num_bytes_received = num_acks_sent = num_acks_received = 0;
162: num_xmit_requests_received = 0;
163: }
164:
165: public Map dumpStats() {
166: Map m = new HashMap();
167: m.put("num_msgs_sent", new Long(num_msgs_sent));
168: m.put("num_msgs_received", new Long(num_msgs_received));
169: m.put("num_bytes_sent", new Long(num_bytes_sent));
170: m.put("num_bytes_received", new Long(num_bytes_received));
171: m.put("num_acks_sent", new Long(num_acks_sent));
172: m.put("num_acks_received", new Long(num_acks_received));
173: m.put("num_xmit_requests_received", new Long(
174: num_xmit_requests_received));
175: m.put("num_unacked_msgs",
176: new Long(getNumberOfUnackedMessages()));
177: m.put("unacked_msgs", getUnackedMessages());
178: m.put("num_msgs_in_recv_windows", new Long(
179: getNumberOfMessagesInReceiveWindows()));
180: return m;
181: }
182:
183: public boolean setProperties(Properties props) {
184: String str;
185: long[] tmp;
186:
187: super .setProperties(props);
188: str = props.getProperty("timeout");
189: if (str != null) {
190: tmp = Util.parseCommaDelimitedLongs(str);
191: if (tmp != null && tmp.length > 0)
192: timeout = tmp;
193: props.remove("timeout");
194: }
195:
196: str = props.getProperty("window_size");
197: if (str != null) {
198: props.remove("window_size");
199: log.warn("window_size is deprecated and will be ignored");
200: }
201:
202: str = props.getProperty("min_threshold");
203: if (str != null) {
204: props.remove("min_threshold");
205: log.warn("min_threshold is deprecated and will be ignored");
206: }
207:
208: str = props.getProperty("use_gms");
209: if (str != null) {
210: use_gms = Boolean.valueOf(str).booleanValue();
211: props.remove("use_gms");
212: }
213:
214: if (props.size() > 0) {
215: log.error("these properties are not recognized: " + props);
216: return false;
217: }
218: return true;
219: }
220:
221: public void start() throws Exception {
222: timer = stack != null ? stack.timer : null;
223: if (timer == null)
224: throw new Exception("timer is null");
225: started = true;
226: }
227:
228: public void stop() {
229: started = false;
230: removeAllConnections();
231: }
232:
233: public void up(Event evt) {
234: Message msg;
235: Address dst, src;
236: UnicastHeader hdr;
237:
238: switch (evt.getType()) {
239:
240: case Event.MSG:
241: msg = (Message) evt.getArg();
242: dst = msg.getDest();
243:
244: if (dst == null || dst.isMulticastAddress()) // only handle unicast messages
245: break; // pass up
246:
247: // changed from removeHeader(): we cannot remove the header because if we do loopback=true at the
248: // transport level, we will not have the header on retransmit ! (bela Aug 22 2006)
249: hdr = (UnicastHeader) msg.getHeader(name);
250: if (hdr == null)
251: break;
252: src = msg.getSrc();
253: switch (hdr.type) {
254: case UnicastHeader.DATA: // received regular message
255: if (handleDataReceived(src, hdr.seqno, msg))
256: sendAck(src, hdr.seqno); // only send an ACK if added to the received_msgs table (bela Aug 2006)
257: return; // we pass the deliverable message up in handleDataReceived()
258: case UnicastHeader.ACK: // received ACK for previously sent message
259: handleAckReceived(src, hdr.seqno);
260: break;
261: default:
262: log.error("UnicastHeader type " + hdr.type
263: + " not known !");
264: break;
265: }
266: return;
267:
268: case Event.SET_LOCAL_ADDRESS:
269: local_addr = (Address) evt.getArg();
270: break;
271: }
272:
273: passUp(evt); // Pass up to the layer above us
274: }
275:
276: public void down(Event evt) {
277: switch (evt.getType()) {
278:
279: case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down
280: Message msg = (Message) evt.getArg();
281: Object dst = msg.getDest();
282:
283: /* only handle unicast messages */
284: if (dst == null || ((Address) dst).isMulticastAddress()) {
285: break;
286: }
287:
288: if (previous_members.contains(dst)) {
289: if (log.isTraceEnabled())
290: log.trace("discarding message to " + dst
291: + " as this member left the group,"
292: + " previous_members=" + previous_members);
293: return;
294: }
295:
296: if (!started) {
297: if (log.isWarnEnabled())
298: log
299: .warn("discarded message as start() has not yet been called, message: "
300: + msg);
301: return;
302: }
303:
304: Entry entry;
305: synchronized (connections) {
306: entry = (Entry) connections.get(dst);
307: if (entry == null) {
308: entry = new Entry();
309: connections.put(dst, entry);
310: if (log.isTraceEnabled())
311: log.trace(local_addr
312: + ": created new connection for dst "
313: + dst);
314: }
315: }
316:
317: Message tmp;
318: synchronized (entry) { // threads will only sync if they access the same entry
319: long seqno = -2;
320:
321: try {
322: seqno = entry.sent_msgs_seqno;
323: UnicastHeader hdr = new UnicastHeader(
324: UnicastHeader.DATA, seqno);
325: if (entry.sent_msgs == null) { // first msg to peer 'dst'
326: entry.sent_msgs = new AckSenderWindow(this ,
327: timeout, timer, this .local_addr); // use the protocol stack's timer
328: }
329: msg.putHeader(name, hdr);
330: if (log.isTraceEnabled())
331: log.trace(new StringBuffer().append(local_addr)
332: .append(" --> DATA(").append(dst)
333: .append(": #").append(seqno));
334: tmp = Global.copy ? msg.copy() : msg;
335: entry.sent_msgs.add(seqno, tmp); // add *including* UnicastHeader, adds to retransmitter
336: entry.sent_msgs_seqno++;
337: } catch (Throwable t) {
338: entry.sent_msgs.ack(seqno); // remove seqno again, so it is not transmitted
339: if (t instanceof Error)
340: throw (Error) t;
341: if (t instanceof RuntimeException)
342: throw (RuntimeException) t;
343: else {
344: throw new RuntimeException(
345: "failure adding msg " + msg
346: + " to the retransmit table", t);
347: }
348: }
349: }
350: // moved passing down of message out of the synchronized block: similar to NAKACK, we do *not* need
351: // to send unicast messages in order of sequence numbers because they will be sorted into the correct
352: // order at the receiver anyway. Of course, most of the time, the order will be correct (FIFO), so
353: // the cost of reordering is minimal. This is part of http://jira.jboss.com/jira/browse/JGRP-303
354: try {
355: passDown(new Event(Event.MSG, tmp));
356: num_msgs_sent++;
357: num_bytes_sent += msg.getLength();
358: } catch (Throwable t) { // eat the exception, don't pass it up the stack
359: if (log.isWarnEnabled()) {
360: log.warn("failure passing message down", t);
361: }
362: }
363:
364: msg = null;
365: return; // we already passed the msg down
366:
367: case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore !
368: Vector new_members = ((View) evt.getArg()).getMembers();
369: Vector left_members;
370: synchronized (members) {
371: left_members = Util.determineLeftMembers(members,
372: new_members);
373: members.clear();
374: if (new_members != null)
375: members.addAll(new_members);
376: }
377:
378: // Remove all connections for members that left between the current view and the new view
379: // See DESIGN for details
380: boolean rc;
381: if (use_gms && left_members.size() > 0) {
382: Object mbr;
383: for (int i = 0; i < left_members.size(); i++) {
384: mbr = left_members.elementAt(i);
385: rc = removeConnection(mbr); // adds to previous_members
386: if (rc && log.isTraceEnabled())
387: log.trace("removed " + mbr
388: + " from connection table, member(s) "
389: + left_members + " left");
390: }
391: }
392: // code by Matthias Weber May 23 2006
393: for (Enumeration e = previous_members.elements(); e
394: .hasMoreElements();) {
395: Object mbr = e.nextElement();
396: if (members.contains(mbr)) {
397: if (previous_members.removeElement(mbr) != null) {
398: if (log.isTraceEnabled())
399: log
400: .trace("removing "
401: + mbr
402: + " from previous_members as result of VIEW_CHANGE event, "
403: + "previous_members="
404: + previous_members);
405: }
406: }
407: }
408: break;
409:
410: case Event.ENABLE_UNICASTS_TO:
411: Object member = evt.getArg();
412: previous_members.removeElement(member);
413: if (log.isTraceEnabled())
414: log
415: .trace("removing "
416: + member
417: + " from previous_members as result of ENABLE_UNICAST_TO event, "
418: + "previous_members="
419: + previous_members);
420: break;
421: }
422:
423: passDown(evt); // Pass on to the layer below us
424: }
425:
426: /** Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false */
427: private boolean removeConnection(Object mbr) {
428: Entry entry;
429:
430: synchronized (connections) {
431: entry = (Entry) connections.remove(mbr);
432: if (!previous_members.contains(mbr))
433: previous_members.add(mbr);
434: }
435: if (entry != null) {
436: entry.reset();
437: if (log.isTraceEnabled())
438: log.trace(local_addr + ": removed connection for dst "
439: + mbr);
440: return true;
441: } else
442: return false;
443: }
444:
445: private void removeAllConnections() {
446: Entry entry;
447:
448: synchronized (connections) {
449: for (Iterator it = connections.values().iterator(); it
450: .hasNext();) {
451: entry = (Entry) it.next();
452: entry.reset();
453: }
454: connections.clear();
455: }
456: }
457:
458: /** Called by AckSenderWindow to resend messages for which no ACK has been received yet */
459: public void retransmit(long seqno, Message msg) {
460: Object dst = msg.getDest();
461:
462: // bela Dec 23 2002:
463: // this will remove a member on a MERGE request, e.g. A and B merge: when A sends the unicast
464: // request to B and there's a retransmit(), B will be removed !
465:
466: // if(use_gms && !members.contains(dst) && !prev_members.contains(dst)) {
467: //
468: // if(log.isWarnEnabled()) log.warn("UNICAST.retransmit()", "seqno=" + seqno + ": dest " + dst +
469: // " is not member any longer; removing entry !");
470:
471: // synchronized(connections) {
472: // removeConnection(dst);
473: // }
474: // return;
475: // }
476:
477: if (log.isTraceEnabled())
478: log.trace("[" + local_addr + "] --> XMIT(" + dst + ": #"
479: + seqno + ')');
480:
481: if (Global.copy)
482: passDown(new Event(Event.MSG, msg.copy()));
483: else
484: passDown(new Event(Event.MSG, msg));
485: num_xmit_requests_received++;
486: }
487:
488: /**
489: * Check whether the hashtable contains an entry e for <code>sender</code> (create if not). If
490: * e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
491: * add message. Set e.received_msgs to the new window. Else just add the message.
492: * @return boolean True if we can send an ack, false otherwise
493: */
494: private boolean handleDataReceived(Object sender, long seqno,
495: Message msg) {
496: if (log.isTraceEnabled())
497: log.trace(new StringBuffer().append(local_addr).append(
498: " <-- DATA(").append(sender).append(": #").append(
499: seqno));
500:
501: if (previous_members.contains(sender)) {
502: // we don't want to see messages from departed members
503: if (seqno > DEFAULT_FIRST_SEQNO) {
504: if (log.isTraceEnabled())
505: log.trace("discarding message " + seqno
506: + " from previous member " + sender);
507: return false; // don't ack this message so the sender keeps resending it !
508: }
509: if (log.isTraceEnabled())
510: log
511: .trace("removed "
512: + sender
513: + " from previous_members as we received a message from it");
514: previous_members.removeElement(sender);
515: }
516:
517: Entry entry;
518: AckReceiverWindow win;
519: synchronized (connections) {
520: entry = (Entry) connections.get(sender);
521: if (entry == null) {
522: entry = new Entry();
523: connections.put(sender, entry);
524: if (log.isTraceEnabled())
525: log.trace(local_addr
526: + ": created new connection for dst "
527: + sender);
528: }
529: win = entry.received_msgs;
530: if (win == null) {
531: win = new AckReceiverWindow(DEFAULT_FIRST_SEQNO);
532: entry.received_msgs = win;
533: }
534: }
535:
536: win.add(seqno, msg); // entry.received_msgs is guaranteed to be non-null if we get here
537: num_msgs_received++;
538: num_bytes_received += msg.getLength();
539:
540: // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
541: Message m;
542:
543: // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
544: // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
545: // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
546: // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
547: // delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered only in the
548: // order in which they were sent by their senders
549: synchronized (win) {
550: while ((m = win.remove()) != null)
551: passUp(new Event(Event.MSG, m));
552: }
553: return true; // msg was successfully received - send an ack back to the sender
554: }
555:
556: /** Add the ACK to hashtable.sender.sent_msgs */
557: private void handleAckReceived(Object sender, long seqno) {
558: Entry entry;
559: AckSenderWindow win;
560:
561: if (log.isTraceEnabled())
562: log.trace(new StringBuffer().append(local_addr).append(
563: " <-- ACK(").append(sender).append(": #").append(
564: seqno).append(')'));
565: synchronized (connections) {
566: entry = (Entry) connections.get(sender);
567: }
568: if (entry == null || entry.sent_msgs == null)
569: return;
570: win = entry.sent_msgs;
571: win.ack(seqno); // removes message from retransmission
572: num_acks_received++;
573: }
574:
575: private void sendAck(Address dst, long seqno) {
576: Message ack = new Message(dst);
577: ack
578: .putHeader(name, new UnicastHeader(UnicastHeader.ACK,
579: seqno));
580: if (log.isTraceEnabled())
581: log.trace(new StringBuffer().append(local_addr).append(
582: " --> ACK(").append(dst).append(": #")
583: .append(seqno).append(')'));
584: passDown(new Event(Event.MSG, ack));
585: num_acks_sent++;
586: }
587:
588: public static class UnicastHeader extends Header implements
589: Streamable {
590: public static final byte DATA = 0;
591: public static final byte ACK = 1;
592:
593: byte type = DATA;
594: long seqno = 0;
595:
596: static final long serialized_size = Global.BYTE_SIZE
597: + Global.LONG_SIZE;
598:
599: public UnicastHeader() {
600: } // used for externalization
601:
602: public UnicastHeader(byte type, long seqno) {
603: this .type = type;
604: this .seqno = seqno;
605: }
606:
607: public String toString() {
608: return "[UNICAST: " + type2Str(type) + ", seqno=" + seqno
609: + ']';
610: }
611:
612: public static String type2Str(byte t) {
613: switch (t) {
614: case DATA:
615: return "DATA";
616: case ACK:
617: return "ACK";
618: default:
619: return "<unknown>";
620: }
621: }
622:
623: public final long size() {
624: return serialized_size;
625: }
626:
627: public void writeExternal(ObjectOutput out) throws IOException {
628: out.writeByte(type);
629: out.writeLong(seqno);
630: }
631:
632: public void readExternal(ObjectInput in) throws IOException,
633: ClassNotFoundException {
634: type = in.readByte();
635: seqno = in.readLong();
636: }
637:
638: public void writeTo(DataOutputStream out) throws IOException {
639: out.writeByte(type);
640: out.writeLong(seqno);
641: }
642:
643: public void readFrom(DataInputStream in) throws IOException,
644: IllegalAccessException, InstantiationException {
645: type = in.readByte();
646: seqno = in.readLong();
647: }
648: }
649:
650: private static final class Entry {
651: AckReceiverWindow received_msgs = null; // stores all msgs rcvd by a certain peer in seqno-order
652: AckSenderWindow sent_msgs = null; // stores (and retransmits) msgs sent by us to a certain peer
653: long sent_msgs_seqno = DEFAULT_FIRST_SEQNO; // seqno for msgs sent by us
654:
655: void reset() {
656: if (sent_msgs != null)
657: sent_msgs.reset();
658: if (received_msgs != null)
659: received_msgs.reset();
660: sent_msgs_seqno = DEFAULT_FIRST_SEQNO;
661: }
662:
663: public String toString() {
664: StringBuffer sb = new StringBuffer();
665: if (sent_msgs != null)
666: sb.append("sent_msgs=").append(sent_msgs).append('\n');
667: if (received_msgs != null)
668: sb.append("received_msgs=").append(received_msgs)
669: .append('\n');
670: return sb.toString();
671: }
672: }
673:
674: }
|