001: // $Id: MERGE2.java,v 1.29.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.View;
008: import org.jgroups.Global;
009: import org.jgroups.stack.Protocol;
010: import org.jgroups.util.Promise;
011: import org.jgroups.util.Util;
012:
013: import java.util.Properties;
014: import java.util.Vector;
015:
016: /**
017: * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group
018: * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
019: * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
020: * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
021: * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
022: * somewhere above this protocol (typically in the GMS protocol).<p>
023: * This protocol works as follows:
024: * <ul>
025: * <li>If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g.
026: * by PING or TCPPING protocols. This list contains {coord,addr} pairs.
027: * <li>If there is more than 1 coordinator:
028: * <ol>
029: * <li>Get all coordinators
030: * <li>Create a MERGE event with the list of coordinators as argument
031: * <li>Send the event up the stack
032: * </ol>
033: * </ul>
034: *
035: * <p>
036: *
037: * Requires: FIND_INITIAL_MBRS event from below<br>
038: * Provides: sends MERGE event with list of coordinators up the stack<br>
039: * @author Bela Ban, Oct 16 2001
040: */
041: public class MERGE2 extends Protocol {
042: Address local_addr = null;
043: String group_name = null;
044: private FindSubgroups task = null; // task periodically executing as long as we are coordinator
045: private final Object task_lock = new Object();
046: long min_interval = 5000; // minimum time between executions of the FindSubgroups task
047: long max_interval = 20000; // maximum time between executions of the FindSubgroups task
048: boolean is_coord = false;
049: final Promise find_promise = new Promise(); // to synchronize FindSubgroups.findInitialMembers() on
050:
051: /** Use a new thread to send the MERGE event up the stack */
052: boolean use_separate_thread = false;
053:
054: public String getName() {
055: return "MERGE2";
056: }
057:
058: public long getMinInterval() {
059: return min_interval;
060: }
061:
062: public void setMinInterval(long i) {
063: min_interval = i;
064: }
065:
066: public long getMaxInterval() {
067: return max_interval;
068: }
069:
070: public void setMaxInterval(long l) {
071: max_interval = l;
072: }
073:
074: public boolean setProperties(Properties props) {
075: String str;
076:
077: super .setProperties(props);
078: str = props.getProperty("min_interval");
079: if (str != null) {
080: min_interval = Long.parseLong(str);
081: props.remove("min_interval");
082: }
083:
084: str = props.getProperty("max_interval");
085: if (str != null) {
086: max_interval = Long.parseLong(str);
087: props.remove("max_interval");
088: }
089:
090: if (min_interval <= 0 || max_interval <= 0) {
091: if (log.isErrorEnabled())
092: log
093: .error("min_interval and max_interval have to be > 0");
094: return false;
095: }
096: if (max_interval <= min_interval) {
097: if (log.isErrorEnabled())
098: log
099: .error("max_interval has to be greater than min_interval");
100: return false;
101: }
102:
103: str = props.getProperty("use_separate_thread");
104: if (str != null) {
105: use_separate_thread = Boolean.valueOf(str).booleanValue();
106: props.remove("use_separate_thread");
107: }
108:
109: if (props.size() > 0) {
110: log.error("the following properties are not recognized: "
111: + props);
112: return false;
113: }
114: return true;
115: }
116:
117: public Vector requiredDownServices() {
118: Vector retval = new Vector(1);
119: retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
120: return retval;
121: }
122:
123: public void stop() {
124: is_coord = false;
125: stopTask();
126: }
127:
128: /**
129: * This prevents the up-handler thread to be created, which is not needed in the protocol.
130: * DON'T REMOVE !
131: */
132: public void startUpHandler() {
133: }
134:
135: /**
136: * This prevents the down-handler thread to be created, which is not needed in the protocol.
137: * DON'T REMOVE !
138: */
139: public void startDownHandler() {
140: }
141:
142: public void up(Event evt) {
143: switch (evt.getType()) {
144:
145: case Event.SET_LOCAL_ADDRESS:
146: local_addr = (Address) evt.getArg();
147: passUp(evt);
148: break;
149:
150: case Event.FIND_INITIAL_MBRS_OK:
151: find_promise.setResult(evt.getArg());
152: passUp(evt); // could be needed by GMS
153: break;
154:
155: default:
156: passUp(evt); // Pass up to the layer above us
157: break;
158: }
159: }
160:
161: public void down(Event evt) {
162: Vector mbrs;
163: Address coord;
164:
165: switch (evt.getType()) {
166:
167: case Event.CONNECT:
168: group_name = (String) evt.getArg();
169: passDown(evt);
170: break;
171:
172: case Event.DISCONNECT:
173: group_name = null;
174: passDown(evt);
175: break;
176:
177: case Event.VIEW_CHANGE:
178: passDown(evt);
179: mbrs = ((View) evt.getArg()).getMembers();
180: if (mbrs == null || mbrs.size() == 0 || local_addr == null) {
181: stopTask();
182: break;
183: }
184: coord = (Address) mbrs.elementAt(0);
185: if (coord.equals(local_addr)) {
186: is_coord = true;
187: startTask(); // start task if we became coordinator (doesn't start if already running)
188: } else {
189: // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone
190: // else becomes the new coordinator of the merged group
191: if (is_coord) {
192: is_coord = false;
193: }
194: stopTask();
195: }
196: break;
197:
198: default:
199: passDown(evt); // Pass on to the layer below us
200: break;
201: }
202: }
203:
204: /* -------------------------------------- Private Methods --------------------------------------- */
205: void startTask() {
206: synchronized (task_lock) {
207: if (task == null)
208: task = new FindSubgroups();
209: task.start();
210: if (group_name != null) {
211: String tmp, prefix = Global.THREAD_PREFIX;
212: tmp = task.getName();
213: if (tmp != null && tmp.indexOf(prefix) == -1) {
214: tmp += prefix + group_name + ")";
215: task.setName(tmp);
216: }
217: }
218: }
219: }
220:
221: void stopTask() {
222: synchronized (task_lock) {
223: if (task != null) {
224: task.stop();
225: task = null;
226: }
227: }
228: }
229:
230: /* ---------------------------------- End of Private Methods ------------------------------------ */
231:
232: /**
233: * Task periodically executing (if role is coordinator). Gets the initial membership and determines
234: * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event
235: * with the list of the coordinators up the stack
236: */
237: private class FindSubgroups implements Runnable {
238: Thread thread = null;
239:
240: String getName() {
241: return thread != null ? thread.getName() : null;
242: }
243:
244: void setName(String thread_name) {
245: if (thread != null)
246: thread.setName(thread_name);
247: }
248:
249: public void start() {
250: if (thread == null || !thread.isAlive()) {
251: thread = new Thread(Util.getGlobalThreadGroup(), this ,
252: "MERGE2.FindSubgroups thread");
253: thread.setDaemon(true);
254: thread.start();
255: }
256: }
257:
258: public void stop() {
259: if (thread != null) {
260: Thread tmp = thread;
261: thread = null;
262: tmp.interrupt(); // wakes up sleeping thread
263: find_promise.reset();
264: }
265: thread = null;
266: }
267:
268: public void run() {
269: long interval;
270: Vector coords;
271: Vector initial_mbrs;
272:
273: // if(log.isDebugEnabled()) log.debug("merge task started as I'm the coordinator");
274: while (thread != null
275: && Thread.currentThread().equals(thread)) {
276: interval = computeInterval();
277: Util.sleep(interval);
278: if (thread == null)
279: break;
280: initial_mbrs = findInitialMembers();
281: if (thread == null)
282: break;
283: if (log.isDebugEnabled())
284: log.debug("initial_mbrs=" + initial_mbrs);
285: coords = detectMultipleCoordinators(initial_mbrs);
286: if (coords != null && coords.size() > 1) {
287: if (log.isDebugEnabled())
288: log.debug("found multiple coordinators: "
289: + coords + "; sending up MERGE event");
290: final Event evt = new Event(Event.MERGE, coords);
291: if (use_separate_thread) {
292: Thread merge_notifier = new Thread() {
293: public void run() {
294: passUp(evt);
295: }
296: };
297: merge_notifier.setDaemon(true);
298: merge_notifier.setName("merge notifier thread");
299: merge_notifier.start();
300: } else {
301: passUp(evt);
302: }
303: }
304: }
305: if (log.isTraceEnabled())
306: log
307: .trace("MERGE2.FindSubgroups thread terminated (local_addr="
308: + local_addr + ")");
309: }
310:
311: /**
312: * Returns a random value within [min_interval - max_interval]
313: */
314: long computeInterval() {
315: return min_interval
316: + Util.random(max_interval - min_interval);
317: }
318:
319: /**
320: * Returns a list of PingRsp pairs.
321: */
322: Vector findInitialMembers() {
323: PingRsp tmp = new PingRsp(local_addr, local_addr, true);
324: find_promise.reset();
325: passDown(Event.FIND_INITIAL_MBRS_EVT);
326: Vector retval = (Vector) find_promise.getResult(0); // wait indefinitely until response is received
327: if (retval != null && is_coord && local_addr != null
328: && !retval.contains(tmp))
329: retval.add(tmp);
330: return retval;
331: }
332:
333: /**
334: * Finds out if there is more than 1 coordinator in the initial_mbrs vector (contains PingRsp elements).
335: * @param initial_mbrs A list of PingRsp pairs
336: * @return Vector A list of the coordinators (Addresses) found. Will contain just 1 element for a correct
337: * membership, and more than 1 for multiple coordinators
338: */
339: Vector detectMultipleCoordinators(Vector initial_mbrs) {
340: Vector ret = new Vector(11);
341: PingRsp rsp;
342: Address coord;
343:
344: if (initial_mbrs == null)
345: return null;
346: for (int i = 0; i < initial_mbrs.size(); i++) {
347: rsp = (PingRsp) initial_mbrs.elementAt(i);
348: if (!rsp.is_server)
349: continue;
350: coord = rsp.getCoordAddress();
351: if (!ret.contains(coord))
352: ret.addElement(coord);
353: }
354:
355: return ret;
356: }
357:
358: }
359:
360: }
|