001: // $Id: MERGE.java,v 1.13.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.stack.RouterStub;
008: import org.jgroups.util.Util;
009:
010: import java.io.IOException;
011: import java.io.ObjectInput;
012: import java.io.ObjectOutput;
013: import java.util.*;
014:
015: /**
016: * Simple and stupid MERGE protocol (does not take into account state transfer).
017: * Periodically mcasts a HELLO message with its own address. When a HELLO message is
018: * received from a member that has the same group (UDP discards all messages with a group
019: * name different that our own), but is not currently in the group, a MERGE event is sent
020: * up the stack. The protocol starts working upon receiving a View in which it is the coordinator.
021: *
022: * @author Gianluca Collot, Jan 2001
023: */
024: public class MERGE extends Protocol implements Runnable {
025: final Vector members = new Vector();
026: Address local_addr = null;
027: String group_addr = null;
028: final String groupname = null;
029: Thread hello_thread = null; // thread that periodically mcasts HELLO messages
030: long timeout = 5000; // timeout between mcasting of HELLO messages
031:
032: String router_host = null;
033: int router_port = 0;
034:
035: RouterStub client = null;
036: boolean is_server = false;
037: boolean is_coord = false;
038: boolean merging = false;
039:
040: public String getName() {
041: return "MERGE";
042: }
043:
044: public boolean setProperties(Properties props) {
045: String str;
046:
047: super .setProperties(props);
048: str = props.getProperty("timeout"); // max time to wait for initial members
049: if (str != null) {
050: timeout = Long.parseLong(str);
051: props.remove("timeout");
052: }
053:
054: str = props.getProperty("router_host"); // host to send gossip queries (if gossip enabled)
055: if (str != null) {
056: router_host = str;
057: props.remove("router_host");
058: }
059:
060: str = props.getProperty("router_port");
061: if (str != null) {
062: router_port = Integer.parseInt(str);
063: props.remove("router_port");
064: }
065:
066: if (router_host != null && router_port != 0)
067: client = new RouterStub(router_host, router_port);
068:
069: if (props.size() > 0) {
070: log.error("the following properties are not recognized: "
071: + props);
072: return false;
073: }
074: return true;
075: }
076:
077: public void start() throws Exception {
078: if (hello_thread == null) {
079: hello_thread = new Thread(this , "MERGE Thread");
080: hello_thread.setDaemon(true);
081: hello_thread.start();
082: }
083: }
084:
085: public void stop() {
086: Thread tmp = null;
087: if (hello_thread != null && hello_thread.isAlive()) {
088: tmp = hello_thread;
089: hello_thread = null;
090: tmp.interrupt();
091: try {
092: tmp.join(1000);
093: } catch (Exception ex) {
094: }
095: }
096: hello_thread = null;
097: }
098:
099: public void up(Event evt) {
100: Message msg;
101: Object obj;
102: MergeHeader hdr;
103: Address sender;
104: boolean contains;
105: Vector tmp;
106:
107: switch (evt.getType()) {
108:
109: case Event.MSG:
110: msg = (Message) evt.getArg();
111: obj = msg.getHeader(getName());
112: if (obj == null || !(obj instanceof MergeHeader)) {
113: passUp(evt);
114: return;
115: }
116: hdr = (MergeHeader) msg.removeHeader(getName());
117:
118: switch (hdr.type) {
119:
120: case MergeHeader.HELLO: // if coord: handle, else: discard
121: if (!is_server || !is_coord) {
122: return;
123: }
124: if (merging) {
125: return;
126: }
127: sender = msg.getSrc();
128: if ((sender != null) && (members.size() >= 0)) {
129: synchronized (members) {
130: contains = members.contains(sender);
131: }
132: //merge only with lower addresses :prevents cycles and ensures that the new coordinator is correct.
133: if (!contains && sender.compareTo(local_addr) < 0) {
134: if (log.isInfoEnabled())
135: log.info("membership " + members
136: + " does not contain " + sender
137: + "; merging it");
138: tmp = new Vector();
139: tmp.addElement(sender);
140: merging = true;
141: passUp(new Event(Event.MERGE, tmp));
142: }
143: }
144: return;
145:
146: default:
147: if (log.isErrorEnabled())
148: log.error("got MERGE hdr with unknown type ("
149: + hdr.type + ')');
150: return;
151: }
152:
153: case Event.SET_LOCAL_ADDRESS:
154: local_addr = (Address) evt.getArg();
155: passUp(evt);
156: break;
157:
158: default:
159: passUp(evt); // Pass up to the layer above us
160: break;
161: }
162: }
163:
164: public void down(Event evt) {
165:
166: switch (evt.getType()) {
167:
168: case Event.TMP_VIEW:
169: passDown(evt);
170: break;
171:
172: case Event.MERGE_DENIED:
173: merging = false;
174: passDown(evt);
175: break;
176:
177: case Event.VIEW_CHANGE:
178: merging = false;
179: synchronized (members) {
180: members.clear();
181: members.addAll(((View) evt.getArg()).getMembers());
182: if ((members == null) || (members.size() == 0)) {
183: if (log.isFatalEnabled())
184: log
185: .fatal("received VIEW_CHANGE with null or empty vector");
186: System.exit(6);
187: }
188: }
189: is_coord = members.elementAt(0).equals(local_addr);
190: passDown(evt);
191: if (is_coord) {
192: if (log.isInfoEnabled())
193: log.info("start sending Hellos");
194: try {
195: start();
196: } catch (Exception ex) {
197: if (log.isWarnEnabled())
198: log.warn("exception calling start(): " + ex);
199: }
200: } else {
201: if (log.isInfoEnabled())
202: log.info("stop sending Hellos");
203: stop();
204: }
205: break;
206:
207: case Event.BECOME_SERVER: // called after client has join and is fully working group member
208: passDown(evt);
209: try {
210: start();
211: is_server = true;
212: } catch (Exception ex) {
213: if (log.isWarnEnabled())
214: log.warn("exception calling start(): " + ex);
215: }
216: break;
217:
218: case Event.CONNECT:
219: group_addr = (String) evt.getArg();
220: passDown(evt);
221: break;
222:
223: case Event.DISCONNECT:
224: if (local_addr != null && evt.getArg() != null
225: && local_addr.equals(evt.getArg()))
226: stop();
227: passDown(evt);
228: break;
229:
230: default:
231: passDown(evt); // Pass on to the layer below us
232: break;
233: }
234: }
235:
236: /**
237: * If IP multicast: periodically mcast a HELLO message
238: * If gossiping: periodically retrieve the membership. Any members not part of our
239: * own membership are merged (passing MERGE event up).
240: */
241: public void run() {
242: Message hello_msg;
243: MergeHeader hdr;
244: List rsps;
245: Vector members_to_merge = new Vector(), tmp;
246: Object mbr;
247:
248: try {
249: Thread.sleep(3000);
250: } /// initial sleep; no premature merging
251: catch (Exception e) {
252: }
253:
254: while (hello_thread != null) {
255: Util.sleep(timeout);
256: if (hello_thread == null)
257: break;
258:
259: if (client == null) { // plain IP MCAST
260: hello_msg = new Message(null);
261: hdr = new MergeHeader(MergeHeader.HELLO);
262: hello_msg.putHeader(getName(), hdr);
263: passDown(new Event(Event.MSG, hello_msg));
264: } else { // gossiping; contact Router
265: rsps = client.get(group_addr);
266:
267: synchronized (members) {
268: members_to_merge.removeAllElements();
269: for (Iterator it = rsps.iterator(); it.hasNext();) {
270: mbr = it.next();
271: if (!members.contains(mbr)) {
272: if (log.isInfoEnabled())
273: log.info("membership " + members
274: + " does not contain " + mbr
275: + "; merging it");
276: members_to_merge.addElement(mbr);
277: }
278: }
279: if (members_to_merge.size() > 0) {
280: Membership new_membership = new Membership(
281: members_to_merge);
282: new_membership.sort();
283: Address coord = (Address) new_membership
284: .elementAt(0);
285: tmp = new Vector();
286: tmp.addElement(coord);
287: if (coord.compareTo(local_addr) < 0)
288: passUp(new Event(Event.MERGE, tmp));
289: }
290: }
291: }
292: }
293: }
294:
295: /* -------------------------- Private methods ---------------------------- */
296:
297: public static class MergeHeader extends Header {
298: public static final int HELLO = 1; // arg = null
299:
300: public int type = 0;
301:
302: public MergeHeader() {
303: } // used for externalization
304:
305: public MergeHeader(int type) {
306: this .type = type;
307: }
308:
309: public String toString() {
310: return "[MERGE: type=" + type2Str(type) + ']';
311: }
312:
313: String type2Str(int t) {
314: switch (t) {
315: case HELLO:
316: return "HELLO";
317: default:
318: return "<unkown type (" + t + ")>";
319: }
320: }
321:
322: public void writeExternal(ObjectOutput out) throws IOException {
323: out.writeInt(type);
324: }
325:
326: public void readExternal(ObjectInput in) throws IOException,
327: ClassNotFoundException {
328: type = in.readInt();
329: }
330: }
331:
332: }
|