001: // $Id: FLUSH.java,v 1.10.10.1 2007/04/27 08:03:52 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.View;
008: import org.jgroups.blocks.GroupRequest;
009: import org.jgroups.blocks.MethodCall;
010: import org.jgroups.stack.RpcProtocol;
011: import org.jgroups.util.List;
012: import org.jgroups.util.Rsp;
013: import org.jgroups.util.RspList;
014: import org.jgroups.util.Util;
015:
016: import java.util.Enumeration;
017: import java.util.Properties;
018: import java.util.Vector;
019:
020: /**
021: The task of the FLUSH protocol is to flush all pending messages out of the system. This is
022: done before a view change by stopping all senders and then agreeing on what messages
023: should be delivered in the current view (before switching to the new view). A coordinator
024: broadcasts a FLUSH message. The message contains an array of the highest sequence number for each member
025: as seen by the coordinator so far. Each member responds with its highest sequence numbers seen so far (for
026: each member): if its sequence number for a member P is higher than the one sent by the coordinator, it
027: will append the messages apparently not received by the coordinator to its reply. The coordinator (when
028: all replies have been received), computes for each member the lowest and highest sequence number and
029: re-broadcasts messages accordingly (using ACKs rather then NAKs to ensure reliable delivery).<p> Example:
030: <pre>
031:
032: FLUSH ---> (p=10, q=22, r=7)
033:
034: <-- (p=10, q=20, r=7) (did not receive 2 messages from q)
035: <-- (p=12, q=23, r=7) (attached are messages p11, p12, and q23)
036: <-- (p=10, q=22, r=8) (attached is message r8)
037: ---------------------
038: min: 11 21 8
039: max: 12 23 8
040: </pre>
041:
042: The coordinator now computes the range for each member and re-broadcasts messages
043: p11, p12, q21, q22, q23 and r8.
044: This is essentially the exclusive min and inclusive max of all replies. Note that messages p11, p12 and q23
045: were not received by the coordinator itself before. They were only returned as result of the FLUSH replies
046: and the coordinator now re-broadcasts them.
047:
048: */
049: public class FLUSH extends RpcProtocol {
050: final Vector mbrs = new Vector();
051: boolean is_server = false;
052: final Object block_mutex = new Object();
053: long block_timeout = 5000;
054: Address local_addr = null;
055: boolean blocked = false; // BLOCK: true, VIEW_CHANGE: false
056: final Object digest_mutex = new Object();
057: long digest_timeout = 2000; // time to wait for retrieval of unstable msgs
058:
059: final Object highest_delivered_mutex = new Object();
060: long[] highest_delivered_msgs;
061:
062: Digest digest = null;
063:
064: final Object get_msgs_mutex = new Object();
065: final long get_msgs_timeout = 4000;
066: List get_msgs = null;
067:
068: public String getName() {
069: return "FLUSH";
070: }
071:
072: public Vector providedUpServices() {
073: Vector retval = new Vector();
074: retval.addElement(new Integer(Event.FLUSH));
075: return retval;
076: }
077:
078: public Vector requiredDownServices() {
079: Vector retval = new Vector();
080: retval.addElement(new Integer(Event.GET_MSGS_RECEIVED)); // NAKACK layer
081: retval.addElement(new Integer(Event.GET_MSG_DIGEST)); // NAKACK layer
082: retval.addElement(new Integer(Event.GET_MSGS)); // NAKACK layer
083: return retval;
084: }
085:
086: public void start() throws Exception {
087: super .start();
088: if (_corr != null) {
089: _corr.setDeadlockDetection(true);
090: } else
091: throw new Exception(
092: "FLUSH.start(): cannot set deadlock detection in corr, as it is null !");
093: }
094:
095: /**
096: Triggered by reception of FLUSH event from GMS layer (must be coordinator). Calls
097: <code>HandleFlush</code> in all members and returns FLUSH_OK event.
098: @param dests A list of members to which the FLUSH is to be sent
099: @return FlushRsp Contains result (true or false), list of unstable messages and list of members
100: failed during the FLUSH.
101: */
102: private FlushRsp flush(Vector dests) {
103: RspList rsp_list;
104: FlushRsp retval = new FlushRsp();
105: Digest digest;
106: long[] min, max;
107: long[] lower[];
108: List unstable_msgs = new List();
109: boolean get_lower_msgs = false;
110:
111: highest_delivered_msgs = new long[members.size()];
112: min = new long[members.size()];
113: max = new long[members.size()];
114:
115: /* Determine the highest seqno (for each member) that was delivered to the application
116: (i.e., consumed by the application). Stores result in array 'highest_delivered_msgs' */
117: getHighestDeliveredSeqnos();
118:
119: for (int i = 0; i < highest_delivered_msgs.length; i++)
120: min[i] = max[i] = highest_delivered_msgs[i];
121:
122: /* Call the handleFlush() method of all existing members. The highest seqnos seen by the coord
123: is the argument */
124: if (log.isInfoEnabled())
125: log.info("calling handleFlush(" + dests + ')');
126: passDown(new Event(Event.SWITCH_OUT_OF_BAND)); // we need out-of-band control for FLUSH ...
127: MethodCall call = new MethodCall("handleFlush", new Object[] {
128: dests, highest_delivered_msgs.clone() }, new String[] {
129: Vector.class.getName(), long[].class.getName() });
130: rsp_list = callRemoteMethods(dests, call, GroupRequest.GET_ALL,
131: 0);
132: if (log.isInfoEnabled())
133: log.info("flush done");
134:
135: /* Process all the responses (Digest): compute a range of messages (min and max seqno) for each
136: member that has to be re-broadcast; FlushRsp contains those messages. They will be re-braodcast
137: by the cordinator (in the GMS protocol). */
138: for (int i = 0; i < rsp_list.size(); i++) {
139: Rsp rsp = (Rsp) rsp_list.elementAt(i);
140: if (rsp.wasReceived()) {
141: digest = (Digest) rsp.getValue();
142: if (digest != null) {
143: for (int j = 0; j < digest.highest_seqnos.length
144: && j < min.length; j++) {
145: min[j] = Math.min(min[j],
146: digest.highest_seqnos[j]);
147: max[j] = Math.max(max[j],
148: digest.highest_seqnos[j]);
149: }
150: if (digest.msgs.size() > 0) {
151: for (Enumeration e = digest.msgs.elements(); e
152: .hasMoreElements();)
153: unstable_msgs.add(e.nextElement());
154: }
155: }
156: }
157: } // end for-loop
158:
159: /* If any of the highest msgs of the flush replies were lower than the ones sent by this
160: coordinator, we have to re-broadcast them. (This won't occur often)
161: Compute the range between min and highest_delivered_msgs */
162: lower = new long[min.length][]; // stores (for each mbr) the range of seqnos (e.g. 20 24): send msgs
163: // 21, 22 and 23 and 24 (excluding lower and including upper range)
164:
165: for (int i = 0; i < min.length; i++) {
166: if (min[i] < highest_delivered_msgs[i]) { // will almost never be the case
167: lower[i] = new long[2];
168: lower[i][0] = min[i]; // lower boundary (excluding)
169: lower[i][1] = highest_delivered_msgs[i]; // upper boundary (including)
170: get_lower_msgs = true;
171: }
172: }
173: if (get_lower_msgs) {
174: get_msgs = null;
175: synchronized (get_msgs_mutex) {
176: passDown(new Event(Event.GET_MSGS, lower));
177: try {
178: get_msgs_mutex.wait(get_msgs_timeout);
179: } catch (Exception e) {
180: }
181: }
182: if (get_msgs != null) {
183: for (Enumeration e = get_msgs.elements(); e
184: .hasMoreElements();)
185: unstable_msgs.add(e.nextElement());
186: }
187: }
188: retval.unstable_msgs = unstable_msgs.getContents();
189: if (rsp_list.numSuspectedMembers() > 0) {
190: retval.result = false;
191: retval.failed_mbrs = rsp_list.getSuspectedMembers();
192: }
193:
194: return retval;
195: }
196:
197: /**
198: Called by coordinator running the FLUSH protocol. Argument is an array of the highest seqnos as seen
199: by the coordinator (for each member). <code>handleFlush()</code> checks for each member its
200: own highest seqno seen for that member. If it is higher than the one seen by the coordinator,
201: all higher messages are attached to the return value (a message digest).
202: @param flush_dests The members to which this message is sent. Processes not in this list just
203: ignore the handleFlush() message.
204: @param highest_seqnos The highest sequence numbers (order corresponding to membership) as seen
205: by coordinator.
206: @return Digest An array of the highest seqnos for each member, as seen by this member. If this
207: member's seqno for a member P is higher than the one in <code>highest_seqnos</code>,
208: the missing messages are added to the message digest as well. This allows the
209: coordinator to re-broadcast missing messages.
210: */
211: public synchronized Digest handleFlush(Vector flush_dests,
212: long[] highest_seqnos) {
213: digest = null;
214:
215: if (log.isInfoEnabled())
216: log.info("flush_dests=" + flush_dests
217: + " , highest_seqnos="
218: + Util.array2String(highest_seqnos));
219:
220: if (!is_server) // don't handle the FLUSH if not yet joined to the group
221: return digest;
222:
223: if (flush_dests == null) {
224: if (log.isWarnEnabled())
225: log.warn("flush dest is null, ignoring flush !");
226: return digest;
227: }
228:
229: if (flush_dests.size() == 0) {
230: if (log.isWarnEnabled())
231: log.warn("flush dest is empty, ignoring flush !");
232: return digest;
233: }
234:
235: if (!flush_dests.contains(local_addr)) {
236:
237: if (log.isWarnEnabled())
238: log.warn("am not in the flush dests, ignoring flush");
239: return digest;
240: }
241:
242: // block sending of messages (only if not already blocked !)
243: if (!blocked) {
244: blocked = true;
245: synchronized (block_mutex) {
246: passUp(new Event(Event.BLOCK));
247: try {
248: block_mutex.wait(block_timeout);
249: } catch (Exception e) {
250: }
251: }
252: }
253:
254: // asks NAKACK layer for unstable messages and saves result in 'digest'
255: getMessageDigest(highest_seqnos);
256: if (log.isInfoEnabled())
257: log.info("returning digest : " + digest);
258: return digest;
259: }
260:
261: /** Returns the highest seqnos (for each member) seen so far (using the NAKACK layer) */
262: void getHighestDeliveredSeqnos() {
263: synchronized (highest_delivered_mutex) {
264: passDown(new Event(Event.GET_MSGS_RECEIVED));
265: try {
266: highest_delivered_mutex.wait(4000);
267: } catch (Exception e) {
268: if (log.isDebugEnabled())
269: log.debug("exception is " + e);
270: }
271: }
272: }
273:
274: /** Interacts with a lower layer to retrieve unstable messages (e.g. NAKACK) */
275: void getMessageDigest(long[] highest_seqnos) {
276: synchronized (digest_mutex) {
277: passDown(new Event(Event.GET_MSG_DIGEST, highest_seqnos));
278: try {
279: digest_mutex.wait(digest_timeout);
280: } catch (Exception e) {
281: }
282: }
283: }
284:
285: /**
286: <b>Callback</b>. Called by superclass when event may be handled.<p>
287: <b>Do not use <code>PassUp</code> in this method as the event is passed up
288: by default by the superclass after this method returns !</b>
289: @return boolean Defaults to true. If false, event will not be passed up the stack.
290: */
291: public boolean handleUpEvent(Event evt) {
292: switch (evt.getType()) {
293:
294: case Event.SET_LOCAL_ADDRESS:
295: local_addr = (Address) evt.getArg();
296: break;
297:
298: case Event.GET_MSG_DIGEST_OK:
299: synchronized (digest_mutex) {
300: digest = (Digest) evt.getArg();
301: digest_mutex.notifyAll();
302: }
303: return false; // don't pass further up
304:
305: case Event.GET_MSGS_RECEIVED_OK:
306: long[] tmp = (long[]) evt.getArg();
307: if (tmp != null)
308: System.arraycopy(tmp, 0, highest_delivered_msgs, 0,
309: tmp.length);
310: synchronized (highest_delivered_mutex) {
311: highest_delivered_mutex.notifyAll();
312: }
313: return false; // don't pass up any further !
314:
315: case Event.GET_MSGS_OK:
316: synchronized (get_msgs_mutex) {
317: get_msgs = (List) evt.getArg();
318: get_msgs_mutex.notifyAll();
319: }
320: break;
321:
322: }
323: return true;
324: }
325:
326: /**
327: <b>Callback</b>. Called by superclass when event may be handled.<p>
328: <b>Do not use <code>PassDown</code> in this method as the event is passed down
329: by default by the superclass after this method returns !</b>
330: @return boolean Defaults to true. If false, event will not be passed down the stack.
331: */
332: public boolean handleDownEvent(Event evt) {
333: Vector dests;
334: FlushRsp rsp;
335:
336: switch (evt.getType()) {
337: case Event.FLUSH:
338: dests = (Vector) evt.getArg();
339: if (dests == null)
340: dests = new Vector();
341: rsp = flush(dests);
342: passUp(new Event(Event.FLUSH_OK, rsp));
343: return false; // don't pass down
344:
345: case Event.BECOME_SERVER:
346: is_server = true;
347: break;
348:
349: case Event.VIEW_CHANGE:
350: blocked = false;
351:
352: Vector tmp = ((View) evt.getArg()).getMembers();
353: if (tmp != null) {
354: mbrs.removeAllElements();
355: for (int i = 0; i < tmp.size(); i++)
356: mbrs.addElement(tmp.elementAt(i));
357: }
358: break;
359: }
360: return true;
361: }
362:
363: /**
364: The default handling adds the event to the down-queue where events are handled in order of
365: addition by a thread. However, there exists a deadlock between the FLUSH and BLOCK_OK down
366: events: when a FLUSH event is received, a BLOCK is sent up, which triggers a BLOCK_OK event
367: to be sent down to be handled by the FLUSH layer. However, the FLUSH layer's thread is still
368: processing the FLUSH down event and is therefore blocked, waiting for a BLOCK_OK event.
369: Therefore, the BLOCK_OK event has to 'preempt' the FLUSH event processing. This is done by
370: overriding this method: when a BLOCK_OK event is received, it is processed immediately
371: (in parallel to the FLUSH event), which causes the FLUSH event processing to return.
372: */
373: public void receiveDownEvent(Event evt) {
374: if (evt.getType() == Event.BLOCK_OK) { // priority handling, otherwise FLUSH would block !
375: synchronized (down_queue) {
376: Event event;
377: try {
378: while (down_queue.size() > 0) {
379: event = (Event) down_queue.remove(10); // wait 10ms at most; queue is *not* empty !
380: down(event);
381: }
382: } catch (Exception e) {
383: }
384: }
385:
386: synchronized (block_mutex) {
387: block_mutex.notifyAll();
388: }
389: return;
390: }
391: super .receiveDownEvent(evt);
392: }
393:
394: public boolean setProperties(Properties props) {
395: super .setProperties(props);
396: String str;
397:
398: str = props.getProperty("block_timeout");
399: if (str != null) {
400: block_timeout = Long.parseLong(str);
401: props.remove("block_timeout");
402: }
403:
404: str = props.getProperty("digest_timeout");
405: if (str != null) {
406: digest_timeout = Long.parseLong(str);
407: props.remove("digest_timeout");
408: }
409:
410: if (props.size() > 0) {
411: log
412: .error("EXAMPLE.setProperties(): these properties are not recognized: "
413: + props);
414:
415: return false;
416: }
417: return true;
418: }
419:
420: }
|