001: // $Id: GetStateTest.java,v 1.10 2006/04/23 12:52:54 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.util.Util;
007:
008: /**
009: * Example of the STATE_TRANSFER protocol and API. The member periodically updates a shared state
010: * (consisting of an array of length 3). New members join the group and fetch the current state before
011: * they become operational. Existing members do not stop while new members fetch the group state.
012: */
013: public class GetStateTest implements Runnable {
014: Channel channel;
015: int[] state; // dice results, it *is* serializable !
016: Thread getter = null;
017: boolean rc = false;
018:
019: public void start() throws Exception {
020: //String props="UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:"+
021: // "STATE_TRANSFER:QUEUE";
022: String props = "UDP(mcast_addr=224.0.0.35;mcast_port=45566;ip_ttl=32;"
023: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
024: + "PING(timeout=2000;num_initial_members=3):"
025: + "MERGE2(min_interval=5000;max_interval=10000):"
026: + "FD_SOCK:"
027: + "VERIFY_SUSPECT(timeout=1500):"
028: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
029: + "UNICAST(timeout=5000):"
030: + "pbcast.STABLE(desired_avg_gossip=20000):"
031: + "FRAG(frag_size=4096;down_thread=false;up_thread=false):"
032: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
033: + "shun=false;print_local_addr=true):"
034: + "pbcast.STATE_TRANSFER";
035:
036: channel = new JChannel(props);
037: channel.connect("TestChannel");
038:
039: System.out.println("Getting state");
040: rc = channel.getState(null, 3000);
041: System.out.println("getState(), rc=" + rc);
042: if (rc == false) {
043: state = new int[3];
044: state[0] = 1;
045: state[1] = 2;
046: state[2] = 3;
047: }
048:
049: System.out.println("State is\n" + printState(state));
050: Util.sleep(2000);
051:
052: getter = new Thread(this , "Getter");
053: getter.start();
054:
055: while (true) {
056: Message update = new Message(null);
057: int index = (int) ((Math.random() * 10) % 3);
058:
059: try {
060: update.setBuffer(Util.objectToByteBuffer(new Integer(
061: index)));
062: } catch (Exception e) {
063: System.err.println(e);
064: }
065: System.out.println("Sending update for index " + index);
066: channel.send(update);
067: Util.sleep(2000);
068: }
069:
070: }
071:
072: public void run() {
073: Object ret;
074:
075: try {
076: while (true) {
077: ret = channel.receive(0);
078:
079: if (ret instanceof Message) {
080: Message m = (Message) ret;
081: Integer index;
082: int in;
083:
084: try {
085: index = (Integer) m.getObject();
086: in = index.intValue();
087:
088: if (state != null) {
089: System.out.println("state[" + in + "]="
090: + (state[in] + 1));
091: state[index.intValue()]++;
092: }
093: } catch (ClassCastException cast_ex) {
094: System.out
095: .println("Contents of buffer was no Integer !");
096: } catch (Exception e) {
097: // System.err.println(e);
098: }
099:
100: } else if (ret instanceof GetStateEvent) {
101: System.out.println("----> State transfer: " + ret);
102: channel.returnState(Util.objectToByteBuffer(state));
103: } else if (ret instanceof SetStateEvent) {
104: Object new_state = Util
105: .objectFromByteBuffer(((SetStateEvent) ret)
106: .getArg());
107: if (new_state != null)
108: state = (int[]) new_state;
109: }
110: }
111: } catch (Exception e) {
112: }
113: }
114:
115: String printState(int[] vec) {
116: StringBuffer ret = new StringBuffer();
117: if (vec != null)
118: for (int i = 0; i < vec.length; i++)
119: ret.append("state[" + i + "]: " + vec[i] + '\n');
120: return ret.toString();
121: }
122:
123: public static void main(String[] args) {
124: try {
125: new GetStateTest().start();
126: } catch (Exception e) {
127: System.err.println(e);
128: }
129: }
130:
131: }
|