001: // $Id: MERGE3.java,v 1.9.6.1 2006/12/13 14:00:12 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.util.TimeScheduler;
008: import org.jgroups.util.Util;
009:
010: import java.io.IOException;
011: import java.io.ObjectInput;
012: import java.io.ObjectOutput;
013: import java.util.*;
014:
015: /**
016: * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group
017: * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
018: * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
019: * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
020: * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
021: * somewhere above this protocol (typically in the GMS protocol).<p>
022: * This protocol works as follows:
023: * <ul>
024: * <li>If coordinator: periodically broadcast a "I'm the coordinator" message. If a coordinator receives such
025: * a message, it immediately initiates a merge by sending up a MERGE event
026: * <p>
027: *
028: * Provides: sends MERGE event with list of coordinators up the stack<br>
029: * @author Bela Ban, Oct 16 2001
030: */
031: public class MERGE3 extends Protocol {
032: Address local_addr = null;
033: long min_interval = 5000; // minimum time between executions of the FindSubgroups task
034: long max_interval = 20000; // maximum time between executions of the FindSubgroups task
035: boolean is_coord = false;
036: final Vector mbrs = new Vector();
037: TimeScheduler timer = null;
038: CoordinatorAnnouncer announcer_task = null;
039: final Set announcements = Collections
040: .synchronizedSet(new HashSet());
041:
042: /** Use a new thread to send the MERGE event up the stack */
043: boolean use_separate_thread = false;
044:
045: public String getName() {
046: return "MERGE3";
047: }
048:
049: public boolean setProperties(Properties props) {
050: String str;
051:
052: super .setProperties(props);
053: str = props.getProperty("min_interval");
054: if (str != null) {
055: min_interval = Long.parseLong(str);
056: props.remove("min_interval");
057: }
058:
059: str = props.getProperty("max_interval");
060: if (str != null) {
061: max_interval = Long.parseLong(str);
062: props.remove("max_interval");
063: }
064:
065: if (min_interval <= 0 || max_interval <= 0) {
066: if (log.isErrorEnabled())
067: log
068: .error("min_interval and max_interval have to be > 0");
069: return false;
070: }
071: if (max_interval <= min_interval) {
072: if (log.isErrorEnabled())
073: log
074: .error("max_interval has to be greater than min_interval");
075: return false;
076: }
077:
078: str = props.getProperty("use_separate_thread");
079: if (str != null) {
080: use_separate_thread = Boolean.valueOf(str).booleanValue();
081: props.remove("use_separate_thread");
082: }
083:
084: if (props.size() > 0) {
085: log
086: .error("MERGE2.setProperties(): the following properties are not recognized: "
087: + props);
088:
089: return false;
090: }
091: return true;
092: }
093:
094: public void init() throws Exception {
095: timer = stack.timer;
096: }
097:
098: /**
099: * This prevents the up-handler thread to be created, which is not needed in the protocol.
100: * DON'T REMOVE !
101: */
102: public void startUpHandler() {
103: }
104:
105: /**
106: * This prevents the down-handler thread to be created, which is not needed in the protocol.
107: * DON'T REMOVE !
108: */
109: public void startDownHandler() {
110: }
111:
112: public void up(Event evt) {
113: switch (evt.getType()) {
114:
115: case Event.MSG:
116: Message msg = (Message) evt.getArg();
117: CoordAnnouncement hdr = (CoordAnnouncement) msg
118: .removeHeader(getName());
119: if (hdr != null) {
120: if (hdr.coord_addr != null && is_coord) {
121: boolean contains;
122: contains = announcements.contains(hdr.coord_addr);
123: announcements.add(hdr.coord_addr);
124: if (log.isDebugEnabled()) {
125: if (contains)
126: log
127: .debug("discarded duplicate announcement: "
128: + hdr.coord_addr
129: + ", announcements="
130: + announcements);
131: else
132: log.debug("received announcement: "
133: + hdr.coord_addr
134: + ", announcements="
135: + announcements);
136: }
137:
138: if (announcements.size() > 1 && is_coord) {
139: processAnnouncements();
140: }
141: }
142: } else
143: passUp(evt);
144: break;
145:
146: case Event.SET_LOCAL_ADDRESS:
147: local_addr = (Address) evt.getArg();
148: passUp(evt);
149: break;
150:
151: default:
152: passUp(evt); // Pass up to the layer above us
153: break;
154: }
155: }
156:
157: public void down(Event evt) {
158: Vector tmp;
159: Address coord;
160:
161: switch (evt.getType()) {
162:
163: case Event.VIEW_CHANGE:
164: passDown(evt);
165: tmp = ((View) evt.getArg()).getMembers();
166: mbrs.clear();
167: mbrs.addAll(tmp);
168: coord = (Address) mbrs.elementAt(0);
169: if (coord.equals(local_addr)) {
170: if (is_coord == false) {
171: is_coord = true;
172: startCoordAnnouncerTask();
173: }
174: } else {
175: if (is_coord == true) {
176: is_coord = false;
177: stopCoordAnnouncerTask();
178: }
179: }
180: break;
181:
182: default:
183: passDown(evt); // Pass on to the layer below us
184: break;
185: }
186: }
187:
188: void startCoordAnnouncerTask() {
189: if (announcer_task == null) {
190: announcements.add(local_addr);
191: announcer_task = new CoordinatorAnnouncer();
192: timer.add(announcer_task);
193: if (log.isDebugEnabled())
194: log
195: .debug("coordinator announcement task started, announcements="
196: + announcements);
197: }
198: }
199:
200: void stopCoordAnnouncerTask() {
201: if (announcer_task != null) {
202: announcer_task.stop();
203: announcer_task = null;
204: announcements.clear();
205: if (log.isDebugEnabled())
206: log.debug("coordinator announcement task stopped");
207: }
208: }
209:
210: /**
211: * Returns a random value within [min_interval - max_interval]
212: */
213: long computeInterval() {
214: return min_interval + Util.random(max_interval - min_interval);
215: }
216:
217: void sendCoordinatorAnnouncement(Address coord) {
218: Message coord_announcement = new Message(); // multicast to all
219: CoordAnnouncement hdr = new CoordAnnouncement(coord);
220: coord_announcement.putHeader(getName(), hdr);
221: passDown(new Event(Event.MSG, coord_announcement));
222: }
223:
224: void processAnnouncements() {
225: if (announcements.size() > 1) {
226: Vector coords = new Vector(announcements); // create a clone
227: if (coords.size() > 1) {
228: if (log.isDebugEnabled())
229: log.debug("passing up MERGE event, coords="
230: + coords);
231: final Event evt = new Event(Event.MERGE, coords);
232: if (use_separate_thread) {
233: Thread merge_notifier = new Thread(Util
234: .getGlobalThreadGroup(),
235: "merge notifier thread") {
236: public void run() {
237: passUp(evt);
238: }
239: };
240: merge_notifier.setDaemon(true);
241: merge_notifier.start();
242: } else {
243: passUp(evt);
244: }
245: }
246: announcements.clear();
247: announcements.add(local_addr);
248: }
249: }
250:
251: class CoordinatorAnnouncer implements TimeScheduler.Task {
252: boolean cancelled = false;
253:
254: public void start() {
255: cancelled = false;
256: }
257:
258: public void stop() {
259: cancelled = true;
260: }
261:
262: public boolean cancelled() {
263: return cancelled;
264: }
265:
266: public long nextInterval() {
267: return computeInterval();
268: }
269:
270: public void run() {
271: if (is_coord)
272: sendCoordinatorAnnouncement(local_addr);
273: }
274: }
275:
276: public static class CoordAnnouncement extends Header {
277: Address coord_addr = null;
278:
279: public CoordAnnouncement() {
280: }
281:
282: public CoordAnnouncement(Address coord) {
283: this .coord_addr = coord;
284: }
285:
286: public void readExternal(ObjectInput in) throws IOException,
287: ClassNotFoundException {
288: coord_addr = (Address) in.readObject();
289: }
290:
291: public void writeExternal(ObjectOutput out) throws IOException {
292: out.writeObject(coord_addr);
293: }
294: }
295:
296: }
|