001: // $Id: SMACK.java,v 1.14.6.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.AckMcastSenderWindow;
007: import org.jgroups.stack.AckReceiverWindow;
008: import org.jgroups.stack.Protocol;
009: import org.jgroups.util.Streamable;
010: import org.jgroups.util.Util;
011:
012: import java.io.*;
013: import java.util.HashMap;
014: import java.util.Iterator;
015: import java.util.Properties;
016: import java.util.Vector;
017:
018: /**
019: * Simple Multicast ACK protocol. A positive acknowledgment-based protocol for reliable delivery of
020: * multicast messages, which does not need any group membership service.
021: * Basically works as follows:
022: * <ul>
023: * <li>Sender S sends multicast message M</li>
024: * <li>When member P receives M, it sends back a unicast ack to S</li>
025: * <li>When S receives the ack from P, it checks whether P is in its
026: * membership list. If not, P will be added. This is necessary to retransmit the next message
027: * sent to P.</li>
028: * <li>When S sends a multicast message M, all members are added to a
029: * retransmission entry (containing all members to which the message
030: * was sent), which is added to a hashmap (keyed by seqno). Whenever
031: * an ack is received from receiver X, X will be removed from the
032: * retransmission list for the given seqno. When the retransmission
033: * list is empty, the seqno will be removed from the hashmap.</li>
034: * <li>A retransmitter thread in the sender periodically retransmits
035: * (either via unicast, or multicast) messages for which no ack has
036: * been received yet</li>
037: * <li>When a max number of (unsuccessful) retransmissions have been
038: * exceeded, all remaining members for that seqno are removed from
039: * the local membership, and the seqno is removed from te hashmap,
040: * ceasing all retransmissions</li>
041: * </ul>
042: * Advantage of this protocol: no group membership necessary, fast.
043: * @author Bela Ban Aug 2002
044: * @version $Revision: 1.14.6.1 $
045: * <BR> Fix membershop bug: start a, b, kill b, restart b: b will be suspected by a.
046: */
047: public class SMACK extends Protocol implements
048: AckMcastSenderWindow.RetransmitCommand {
049: long[] timeout = { 1000, 2000, 3000 }; // retransmit timeouts (for AckMcastSenderWindow)
050: int max_xmits = 10; // max retransmissions (if still no ack, member will be removed)
051: final Vector members = new Vector(); // contains Addresses
052: AckMcastSenderWindow sender_win = null;
053: final HashMap receivers = new HashMap(); // keys=sender (Address), values=AckReceiverWindow
054: final HashMap xmit_table = new HashMap(); // keeps track of num xmits / member (keys: mbr, val:num)
055: Address local_addr = null; // my own address
056: long seqno = 1; // seqno for msgs sent by this sender
057: long vid = 1; // for the fake view changes
058: boolean print_local_addr = true;
059: static final String name = "SMACK";
060:
061: public SMACK() {
062: }
063:
064: public String getName() {
065: return name;
066: }
067:
068: public boolean setProperties(Properties props) {
069: String str;
070: long[] tmp;
071:
072: super .setProperties(props);
073: str = props.getProperty("print_local_addr");
074: if (str != null) {
075: print_local_addr = Boolean.valueOf(str).booleanValue();
076: props.remove("print_local_addr");
077: }
078:
079: str = props.getProperty("timeout");
080: if (str != null) {
081: tmp = Util.parseCommaDelimitedLongs(str);
082: props.remove("timeout");
083: if (tmp != null && tmp.length > 0)
084: timeout = tmp;
085: }
086:
087: str = props.getProperty("max_xmits");
088: if (str != null) {
089: max_xmits = Integer.parseInt(str);
090: props.remove("max_xmits");
091: }
092:
093: if (props.size() > 0) {
094: log
095: .error("SMACK.setProperties(): the following properties are not recognized: "
096: + props);
097:
098: return false;
099: }
100: return true;
101: }
102:
103: public void stop() {
104: AckReceiverWindow win;
105: if (sender_win != null) {
106: sender_win.stop();
107: sender_win = null;
108: }
109: for (Iterator it = receivers.values().iterator(); it.hasNext();) {
110: win = (AckReceiverWindow) it.next();
111: win.reset();
112: }
113: receivers.clear();
114: }
115:
116: public void up(Event evt) {
117: Address sender;
118:
119: switch (evt.getType()) {
120:
121: case Event.SET_LOCAL_ADDRESS:
122: local_addr = (Address) evt.getArg();
123: addMember(local_addr);
124: if (print_local_addr) {
125: System.out
126: .println("\n-------------------------------------------------------\n"
127: + "GMS: address is "
128: + local_addr
129: + "\n-------------------------------------------------------");
130: }
131: break;
132:
133: case Event.CONNECT_OK:
134: passUp(evt);
135: sender_win = new AckMcastSenderWindow(this , timeout);
136:
137: // send join announcement
138: Message join_msg = new Message();
139: join_msg.putHeader(name, new SmackHeader(
140: SmackHeader.JOIN_ANNOUNCEMENT, -1));
141: passDown(new Event(Event.MSG, join_msg));
142: return;
143:
144: case Event.SUSPECT:
145:
146: if (log.isInfoEnabled())
147: log.info("removing suspected member " + evt.getArg());
148: removeMember((Address) evt.getArg());
149: break;
150:
151: case Event.MSG:
152: Message msg = (Message) evt.getArg(),
153: tmp_msg;
154: if (msg == null)
155: break;
156: sender = msg.getSrc();
157: SmackHeader hdr = (SmackHeader) msg.removeHeader(name);
158: if (hdr == null) // is probably a unicast message
159: break;
160: switch (hdr.type) {
161: case SmackHeader.MCAST: // send an ack, then pass up (if not already received)
162: Long tmp_seqno;
163: AckReceiverWindow win;
164: Message ack_msg = new Message(sender);
165:
166: ack_msg.putHeader(name, new SmackHeader(
167: SmackHeader.ACK, hdr.seqno));
168: passDown(new Event(Event.MSG, ack_msg));
169:
170: tmp_seqno = new Long(hdr.seqno);
171:
172: if (log.isTraceEnabled())
173: log.trace("received #" + tmp_seqno + " from "
174: + sender);
175:
176: win = (AckReceiverWindow) receivers.get(sender);
177: if (win == null) {
178: addMember(sender);
179: win = new AckReceiverWindow(hdr.seqno);
180: receivers.put(sender, win);
181: }
182: win.add(hdr.seqno, msg);
183:
184: // now remove as many messages as possible
185: while ((tmp_msg = win.remove()) != null)
186: passUp(new Event(Event.MSG, tmp_msg));
187: return;
188:
189: case SmackHeader.ACK:
190: addMember(msg.getSrc());
191: sender_win.ack(hdr.seqno, msg.getSrc());
192: sender_win.clearStableMessages();
193: if (log.isTraceEnabled())
194: log.trace("received ack for #" + hdr.seqno
195: + " from " + msg.getSrc());
196: return;
197:
198: case SmackHeader.JOIN_ANNOUNCEMENT:
199:
200: if (log.isInfoEnabled())
201: log.info("received join announcement by "
202: + msg.getSrc());
203:
204: if (!containsMember(sender)) {
205: Message join_rsp = new Message(sender);
206: join_rsp.putHeader(name, new SmackHeader(
207: SmackHeader.JOIN_ANNOUNCEMENT, -1));
208: passDown(new Event(Event.MSG, join_rsp));
209: }
210: addMember(sender);
211: return;
212:
213: case SmackHeader.LEAVE_ANNOUNCEMENT:
214:
215: if (log.isInfoEnabled())
216: log.info("received leave announcement by "
217: + msg.getSrc());
218:
219: removeMember(sender);
220: return;
221:
222: default:
223: if (log.isWarnEnabled())
224: log.warn("detected SmackHeader with invalid type: "
225: + hdr);
226: break;
227: }
228: break;
229: }
230:
231: passUp(evt);
232: }
233:
234: public void down(Event evt) {
235: Message leave_msg;
236:
237: switch (evt.getType()) {
238:
239: case Event.DISCONNECT:
240: leave_msg = new Message();
241: leave_msg.putHeader(name, new SmackHeader(
242: SmackHeader.LEAVE_ANNOUNCEMENT, -1));
243: passDown(new Event(Event.MSG, leave_msg));
244: // passUp(new Event(Event.DISCONNECT_OK));
245: break;
246:
247: case Event.CONNECT:
248: //passUp(new Event(Event.CONNECT_OK));
249:
250: // Do not send JOIN_ANOUNCEMENT here, don't know yet if the transport is OK.
251: // Send it later when handling CONNECT_OK from below
252:
253: // sender_win=new AckMcastSenderWindow(this, timeout);
254: // // send join announcement
255: // Message join_msg=new Message();
256: // join_msg.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1));
257: // passDown(new Event(Event.MSG, join_msg));
258: // return;
259:
260: break;
261:
262: // add a header with the current sequence number and increment seqno
263: case Event.MSG:
264: Message msg = (Message) evt.getArg();
265: if (msg == null)
266: break;
267: if (msg.getDest() == null
268: || msg.getDest().isMulticastAddress()) {
269: msg.putHeader(name, new SmackHeader(SmackHeader.MCAST,
270: seqno));
271: sender_win.add(seqno, msg, (Vector) members.clone());
272: if (log.isTraceEnabled())
273: log.trace("sending mcast #" + seqno);
274: seqno++;
275: }
276: break;
277: }
278:
279: passDown(evt);
280: }
281:
282: /* ----------------------- Interface AckMcastSenderWindow.RetransmitCommand -------------------- */
283:
284: public void retransmit(long seqno, Message msg, Address dest) {
285: msg.setDest(dest);
286:
287: if (log.isInfoEnabled())
288: log.info(seqno + ", msg=" + msg);
289: passDown(new Event(Event.MSG, msg));
290: }
291:
292: /* -------------------- End of Interface AckMcastSenderWindow.RetransmitCommand ---------------- */
293:
294: public static class SmackHeader extends Header implements
295: Streamable {
296: public static final byte MCAST = 1;
297: public static final byte ACK = 2;
298: public static final byte JOIN_ANNOUNCEMENT = 3;
299: public static final byte LEAVE_ANNOUNCEMENT = 4;
300:
301: byte type = 0;
302: long seqno = -1;
303:
304: public SmackHeader() {
305: }
306:
307: public SmackHeader(byte type, long seqno) {
308: this .type = type;
309: this .seqno = seqno;
310: }
311:
312: public void writeExternal(ObjectOutput out) throws IOException {
313: out.writeByte(type);
314: out.writeLong(seqno);
315: }
316:
317: public void readExternal(ObjectInput in) throws IOException,
318: ClassNotFoundException {
319: type = in.readByte();
320: seqno = in.readLong();
321: }
322:
323: public long size() {
324: return Global.LONG_SIZE + Global.BYTE_SIZE;
325: }
326:
327: public void writeTo(DataOutputStream out) throws IOException {
328: out.writeByte(type);
329: out.writeLong(seqno);
330: }
331:
332: public void readFrom(DataInputStream in) throws IOException,
333: IllegalAccessException, InstantiationException {
334: type = in.readByte();
335: seqno = in.readLong();
336: }
337:
338: public String toString() {
339: switch (type) {
340: case MCAST:
341: return "MCAST";
342: case ACK:
343: return "ACK";
344: case JOIN_ANNOUNCEMENT:
345: return "JOIN_ANNOUNCEMENT";
346: case LEAVE_ANNOUNCEMENT:
347: return "LEAVE_ANNOUNCEMENT";
348: default:
349: return "<unknown>";
350: }
351: }
352: }
353:
354: /* ------------------------------------- Private methods --------------------------------------- */
355: void addMember(Address mbr) {
356: synchronized (members) {
357: if (mbr != null && !members.contains(mbr)) {
358: Object tmp;
359: View new_view;
360: members.addElement(mbr);
361: tmp = members.clone();
362: if (log.isTraceEnabled())
363: log.trace("added " + mbr + ", members=" + tmp);
364: new_view = new View(new ViewId(local_addr, vid++),
365: (Vector) tmp);
366: passUp(new Event(Event.VIEW_CHANGE, new_view));
367: passDown(new Event(Event.VIEW_CHANGE, new_view));
368: }
369: }
370: }
371:
372: void removeMember(Address mbr) {
373: synchronized (members) {
374: if (mbr != null) {
375: Object tmp;
376: View new_view;
377: members.removeElement(mbr);
378: tmp = members.clone();
379: if (log.isTraceEnabled())
380: log.trace("removed " + mbr + ", members=" + tmp);
381: new_view = new View(new ViewId(local_addr, vid++),
382: (Vector) tmp);
383: passUp(new Event(Event.VIEW_CHANGE, new_view));
384: passDown(new Event(Event.VIEW_CHANGE, new_view));
385: if (sender_win != null)
386: sender_win.remove(mbr); // causes retransmissions to mbr to stop
387: }
388: }
389: }
390:
391: boolean containsMember(Address mbr) {
392: synchronized (members) {
393: return mbr != null && members.contains(mbr);
394: }
395: }
396:
397: /* --------------------------------- End of Private methods ------------------------------------ */
398:
399: }
|