001: // $Id: Gossip.java,v 1.9 2005/10/31 10:56:31 belaban Exp $
002:
003: package org.jgroups.demos;
004:
005: import org.jgroups.*;
006: import org.jgroups.util.Util;
007:
008: import javax.swing.*;
009: import java.awt.*;
010: import java.awt.event.*;
011: import java.io.ByteArrayOutputStream;
012: import java.io.ObjectOutputStream;
013: import java.io.Serializable;
014: import java.util.Random;
015: import java.util.Vector;
016:
017: /**
018: * Demos that tries to graphically illustrating the gossip (or pbcast) protocol: every sender periodically sends
019: * a DRAW command to a random subset of the group members. Each member checks whether it already received the
020: * message and applies it if not yet received. Otherwise it discards it. If not yet received, the message will
021: * be forwarded to 10% of the group members. This demo is probably only interesting when we have a larger
022: * number of members: a gossip will gradually reach all members, coloring their whiteboards.
023: */
024: public class Gossip implements Runnable, WindowListener,
025: ActionListener, ChannelListener {
026: private Graphics graphics = null;
027: private Frame mainFrame = null;
028: private JPanel panel = null, sub_panel = null;
029: private final ByteArrayOutputStream out = new ByteArrayOutputStream();
030: private final Random random = new Random(System.currentTimeMillis());
031: private Button gossip_button, clear_button, leave_button;
032: private final Font default_font = new Font("Helvetica", Font.PLAIN,
033: 12);
034: private final String groupname = "GossipGroupDemo";
035: private Channel channel = null;
036: private Thread receiver = null;
037: private int member_size = 1;
038: private final Vector members = new Vector();
039: private int red = 0, green = 0, blue = 0;
040: private Color default_color = null;
041: boolean first = true;
042: final double subset = 0.1;
043: Address local_addr = null;
044: TrafficGenerator gen = null;
045: long traffic_interval = 0;
046:
047: public Gossip(String props, long traffic) throws Exception {
048:
049: channel = new JChannel(props);
050: channel.addChannelListener(this );
051: channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
052: traffic_interval = traffic;
053: if (traffic_interval > 0) {
054: gen = new TrafficGenerator();
055: gen.start();
056: }
057: }
058:
059: public static void main(String[] args) {
060: Gossip gossip = null;
061: String props = null;
062: long traffic = 0;
063:
064: for (int i = 0; i < args.length; i++) {
065: if ("-help".equals(args[i])) {
066: System.out
067: .println("Gossip [-traffic_interval <interval in msecs>] [-help]");
068: return;
069: }
070: if ("-traffic_interval".equals(args[i])) {
071: traffic = Long.parseLong(args[++i]);
072: continue;
073: }
074: }
075:
076: // props="UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:PERF(trace=;details=true)";
077:
078: /**
079: props="TCP(start_port=8000):" +
080: "TCPPING(num_initial_members=1;timeout=3000;port_range=2;"+
081: "initial_hosts=daddy[8000],terrapin[8000],sindhu[8000]):" +
082: "FD:" +
083: "pbcast.PBCAST(gossip_interval=5000;gc_lag=50):" +
084: "UNICAST:" +
085: "FRAG:" +
086: "pbcast.GMS";
087: // "PERF(trace=true;details=true)";
088: **/
089:
090: props = "UDP(mcast_addr=224.10.10.100;mcast_port=5678;ip_ttl=32):"
091: + "PING:"
092: +
093: // "FD(shun=true;timeout=5000):" +
094: "pbcast.FD(timeout=3000):"
095: + "VERIFY_SUSPECT(timeout=2000;num_msgs=2):"
096: + "pbcast.PBCAST(desired_avg_gossip=8000;mcast_gossip=true;gc_lag=30;max_queue=20):"
097: + "UNICAST:" + "FRAG:" + "pbcast.GMS"; // :" + // ;join_timeout=20):" +
098: // "PERF(trace=true;details=true)";
099:
100: try {
101: gossip = new Gossip(props, traffic);
102: gossip.go();
103: } catch (Exception e) {
104: System.err.println(e);
105: System.exit(0);
106: }
107: }
108:
109: private void selectColor() {
110: red = (Math.abs(random.nextInt()) % 255);
111: green = (Math.abs(random.nextInt()) % 255);
112: blue = (Math.abs(random.nextInt()) % 255);
113: default_color = new Color(red, green, blue);
114: }
115:
116: public void go() {
117: try {
118: channel.connect(groupname);
119: local_addr = channel.getLocalAddress();
120: startThread();
121: mainFrame = new Frame();
122: panel = new MyPanel();
123: sub_panel = new JPanel();
124: mainFrame.setSize(250, 250);
125: mainFrame.add("Center", panel);
126: clear_button = new Button("Clear");
127: clear_button.setFont(default_font);
128: clear_button.addActionListener(this );
129: gossip_button = new Button("Gossip");
130: gossip_button.setFont(default_font);
131: gossip_button.addActionListener(this );
132: leave_button = new Button("Leave & Exit");
133: leave_button.setFont(default_font);
134: leave_button.addActionListener(this );
135: sub_panel.add("South", gossip_button);
136: sub_panel.add("South", clear_button);
137: sub_panel.add("South", leave_button);
138: mainFrame.add("South", sub_panel);
139: mainFrame.addWindowListener(this );
140: mainFrame.setVisible(true);
141: setTitle();
142: graphics = panel.getGraphics();
143: graphics.setColor(default_color);
144: mainFrame.setBackground(Color.white);
145: mainFrame.pack();
146: gossip_button.setForeground(Color.blue);
147: clear_button.setForeground(Color.blue);
148: leave_button.setForeground(Color.blue);
149: } catch (Exception e) {
150: System.err.println(e);
151: return;
152: }
153: }
154:
155: void startThread() {
156: receiver = new Thread(this , "GossipThread");
157: receiver.setPriority(Thread.MAX_PRIORITY);
158: receiver.start();
159: }
160:
161: void setTitle() {
162: String title = "";
163: if (local_addr != null)
164: title += local_addr;
165: title += " (" + member_size + ") mbrs";
166: mainFrame.setTitle(title);
167: }
168:
169: public void run() {
170: Object tmp;
171: Message msg = null;
172: Command comm;
173: boolean fl = true;
174: Vector mbrs;
175: ObjectOutputStream os;
176:
177: while (fl) {
178: try {
179: tmp = channel.receive(0);
180: // System.out.println("Gossip.run(): received " + tmp);
181:
182: if (tmp == null)
183: continue;
184:
185: if (tmp instanceof View) {
186: View v = (View) tmp;
187: member_size = v.size();
188: mbrs = v.getMembers();
189: members.removeAllElements();
190: for (int i = 0; i < mbrs.size(); i++)
191: members.addElement(mbrs.elementAt(i));
192: if (mainFrame != null)
193: setTitle();
194: continue;
195: }
196:
197: if (tmp instanceof ExitEvent) {
198: // System.out.println("-- Gossip.main(): received EXIT, waiting for ChannelReconnected callback");
199: break;
200: }
201:
202: if (!(tmp instanceof Message))
203: continue;
204:
205: msg = (Message) tmp;
206: comm = null;
207:
208: Object obj = msg.getObject();
209:
210: // System.out.println("obj is " + obj);
211:
212: if (obj instanceof Command)
213: comm = (Command) obj;
214: else if (obj instanceof Message) {
215: System.out.println("*** Message is "
216: + Util.printMessage((Message) obj));
217: Util.dumpStack(true);
218: } else {
219: if (obj != null)
220: System.out.println("obj is " + obj.getClass()
221: + ", hdrs are"
222: + msg.printObjectHeaders());
223: else
224: System.out.println("hdrs are"
225: + msg.printObjectHeaders());
226: Util.dumpStack(true);
227: }
228:
229: switch (comm.mode) {
230: case Command.GOSSIP:
231: if (graphics != null) {
232: colorPanel(comm.r, comm.g, comm.b);
233: comm.not_seen.removeElement(local_addr);
234: if (comm.not_seen.size() > 0) { // forward gossip
235: Vector v = Util.pickSubset(comm.not_seen,
236: subset);
237: out.reset();
238: os = new ObjectOutputStream(out);
239: os.writeObject(comm);
240: os.flush();
241: for (int i = 0; i < v.size(); i++) {
242: channel.send(new Message((Address) v
243: .elementAt(i), null, out
244: .toByteArray()));
245: }
246: }
247: }
248: break;
249: case Command.CLEAR:
250: clearPanel();
251: continue;
252: default:
253: System.err
254: .println("***** Gossip.run(): received invalid draw command "
255: + comm.mode);
256: break;
257: }
258:
259: } catch (ChannelNotConnectedException not) {
260: System.err.println("Gossip: " + not);
261: break;
262: } catch (ChannelClosedException closed) {
263: System.err.println("Gossip: channel was closed");
264: break;
265: } catch (Exception e) {
266: System.err.println(e);
267: continue; // break;
268: }
269: }
270: }
271:
272: /* --------------- Callbacks --------------- */
273:
274: public void mouseMoved(MouseEvent e) {
275: }
276:
277: public void clearPanel() {
278: Rectangle bounds = null;
279: if (panel == null || graphics == null)
280: return;
281:
282: bounds = panel.getBounds();
283: graphics.clearRect(0, 0, bounds.width, bounds.height);
284: }
285:
286: public void colorPanel(int r, int g, int b) {
287: if (graphics != null) {
288: red = r;
289: green = g;
290: blue = b;
291: graphics.setColor(new Color(red, green, blue));
292: Rectangle bounds = panel.getBounds();
293: graphics.fillRect(0, 0, bounds.width, bounds.height);
294: graphics.setColor(default_color);
295: }
296: }
297:
298: void sendGossip() {
299: int tmp[] = new int[1];
300: tmp[0] = 0;
301: Command comm;
302: ObjectOutputStream os;
303: Vector dests = (Vector) members.clone();
304:
305: try {
306: selectColor(); // set a new randomly chosen color
307: dests.removeElement(local_addr);
308: dests = Util.pickSubset(dests, subset);
309: if (dests == null || dests.size() == 0) { // only apply new color locally
310: // System.out.println("-- local");
311: colorPanel(red, green, blue);
312: return;
313: }
314:
315: colorPanel(red, green, blue);
316: comm = new Command(Command.GOSSIP, red, green, blue);
317: comm.not_seen = (Vector) members.clone();
318: comm.not_seen.removeElement(local_addr);
319: out.reset();
320: os = new ObjectOutputStream(out);
321: os.writeObject(comm);
322: os.flush();
323: for (int i = 0; i < dests.size(); i++) {
324: channel.send(new Message((Address) dests.elementAt(i),
325: null, out.toByteArray()));
326: }
327: } catch (Exception ex) {
328: System.err.println(ex);
329: }
330: }
331:
332: public void sendClearPanelMsg() {
333: int tmp[] = new int[1];
334: tmp[0] = 0;
335: Command comm = new Command(Command.CLEAR);
336: ObjectOutputStream os;
337:
338: try {
339: out.reset();
340: os = new ObjectOutputStream(out);
341: os.writeObject(comm);
342: os.flush();
343: channel.send(new Message(null, null, out.toByteArray()));
344: } catch (Exception ex) {
345: System.err.println(ex);
346: }
347: }
348:
349: public void windowActivated(WindowEvent e) {
350: }
351:
352: public void windowClosed(WindowEvent e) {
353: }
354:
355: public void windowClosing(WindowEvent e) {
356: System.exit(0); // exit the dirty way ...
357: }
358:
359: public void windowDeactivated(WindowEvent e) {
360: }
361:
362: public void windowDeiconified(WindowEvent e) {
363: }
364:
365: public void windowIconified(WindowEvent e) {
366: }
367:
368: public void windowOpened(WindowEvent e) {
369: }
370:
371: public void actionPerformed(ActionEvent e) {
372: String command = e.getActionCommand();
373: if ("Gossip".equals(command)) {
374: sendGossip();
375: } else if ("Clear".equals(command))
376: sendClearPanelMsg();
377: else if ("Leave & Exit".equals(command)) {
378: try {
379: channel.disconnect();
380: channel.close();
381: } catch (Exception ex) {
382: System.err.println(ex);
383: }
384: mainFrame.setVisible(false);
385: System.exit(0);
386: } else
387: System.out.println("Unknown action");
388: }
389:
390: public void channelConnected(Channel channel) {
391: if (first)
392: first = false;
393: else
394: startThread();
395: }
396:
397: public void channelDisconnected(Channel channel) {
398: // System.out.println("----> channelDisconnected()");
399: }
400:
401: public void channelClosed(Channel channel) {
402: // System.out.println("----> channelClosed()");
403: }
404:
405: public void channelShunned() {
406: System.out.println("----> channelShunned()");
407: }
408:
409: public void channelReconnected(Address new_addr) {
410: System.out
411: .println("----> channelReconnected(" + new_addr + ')');
412: local_addr = new_addr;
413: }
414:
415: private static class Command implements Serializable {
416: static final int GOSSIP = 1;
417: static final int CLEAR = 2;
418: final int mode;
419: int r = 0;
420: int g = 0;
421: int b = 0;
422: Vector not_seen = new Vector();
423:
424: Command(int mode) {
425: this .mode = mode;
426: }
427:
428: Command(int mode, int r, int g, int b) {
429: this .mode = mode;
430: this .r = r;
431: this .g = g;
432: this .b = b;
433: }
434:
435: public String toString() {
436: StringBuffer ret = new StringBuffer();
437: switch (mode) {
438: case GOSSIP:
439: ret.append("GOSSIP(" + r + '|' + g + '|' + b);
440: break;
441: case CLEAR:
442: ret.append("CLEAR");
443: break;
444: default:
445: return "<undefined>";
446: }
447: ret.append(", not_seen=" + not_seen);
448: return ret.toString();
449: }
450: }
451:
452: private class TrafficGenerator implements Runnable {
453: Thread generator = null;
454:
455: public void start() {
456: if (generator == null) {
457: generator = new Thread(this , "TrafficGeneratorThread");
458: generator.start();
459: }
460: }
461:
462: public void stop() {
463: if (generator != null)
464: generator = null;
465: generator = null;
466: }
467:
468: public void run() {
469: while (generator != null) {
470: Util.sleep(traffic_interval);
471: if (generator != null)
472: sendGossip();
473: }
474: }
475: }
476:
477: private class MyPanel extends JPanel {
478: final Dimension preferred_size = new Dimension(200, 200);
479:
480: public Dimension getPreferredSize() {
481: return preferred_size;
482: }
483:
484: }
485:
486: }
|