001: // $Id: GroupRequest.java,v 1.21.2.2 2006/12/09 22:55:26 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Address;
008: import org.jgroups.Message;
009: import org.jgroups.Transport;
010: import org.jgroups.View;
011: import org.jgroups.util.Command;
012: import org.jgroups.util.Rsp;
013: import org.jgroups.util.RspList;
014:
015: import java.util.*;
016:
017: /**
018: * Sends a message to all members of the group and waits for all responses (or timeout). Returns a
019: * boolean value (success or failure). Results (if any) can be retrieved when done.<p>
020: * The supported transport to send requests is currently either a RequestCorrelator or a generic
021: * Transport. One of them has to be given in the constructor. It will then be used to send a
022: * request. When a message is received by either one, the receiveResponse() of this class has to
023: * be called (this class does not actively receive requests/responses itself). Also, when a view change
024: * or suspicion is received, the methods viewChange() or suspect() of this class have to be called.<p>
025: * When started, an array of responses, correlating to the membership, is created. Each response
026: * is added to the corresponding field in the array. When all fields have been set, the algorithm
027: * terminates.
028: * This algorithm can optionally use a suspicion service (failure detector) to detect (and
029: * exclude from the membership) fauly members. If no suspicion service is available, timeouts
030: * can be used instead (see <code>execute()</code>). When done, a list of suspected members
031: * can be retrieved.<p>
032: * Because a channel might deliver requests, and responses to <em>different</em> requests, the
033: * <code>GroupRequest</code> class cannot itself receive and process requests/responses from the
034: * channel. A mechanism outside this class has to do this; it has to determine what the responses
035: * are for the message sent by the <code>execute()</code> method and call <code>receiveResponse()</code>
036: * to do so.<p>
037: * <b>Requirements</b>: lossless delivery, e.g. acknowledgment-based message confirmation.
038: * @author Bela Ban
039: * @version $Revision: 1.21.2.2 $
040: */
041: public class GroupRequest implements RspCollector, Command {
042: /** return only first response */
043: public static final int GET_FIRST = 1;
044:
045: /** return all responses */
046: public static final int GET_ALL = 2;
047:
048: /** return majority (of all non-faulty members) */
049: public static final int GET_MAJORITY = 3;
050:
051: /** return majority (of all members, may block) */
052: public static final int GET_ABS_MAJORITY = 4;
053:
054: /** return n responses (may block) */
055: public static final int GET_N = 5;
056:
057: /** return no response (async call) */
058: public static final int GET_NONE = 6;
059:
060: private Address caller;
061:
062: /** Map<Address, Rsp>. Maps requests and responses */
063: private final Map requests = new HashMap();
064:
065: /** bounded queue of suspected members */
066: private final Vector suspects = new Vector();
067:
068: /** list of members, changed by viewChange() */
069: private final Collection members = new TreeSet();
070:
071: /** keep suspects vector bounded */
072: private final int max_suspects = 40;
073: protected Message request_msg;
074: protected RequestCorrelator corr; // either use RequestCorrelator or ...
075: protected Transport transport; // Transport (one of them has to be non-null)
076:
077: protected int rsp_mode = GET_ALL;
078: protected boolean done = false;
079: protected long timeout = 0;
080: protected int expected_mbrs = 0;
081:
082: private static final Log log = LogFactory
083: .getLog(GroupRequest.class);
084:
085: /** to generate unique request IDs (see getRequestId()) */
086: private static long last_req_id = 1;
087:
088: private long req_id = -1; // request ID for this request
089:
090: /**
091: @param m The message to be sent
092: @param corr The request correlator to be used. A request correlator sends requests tagged with
093: a unique ID and notifies the sender when matching responses are received. The
094: reason <code>GroupRequest</code> uses it instead of a <code>Transport</code> is
095: that multiple requests/responses might be sent/received concurrently.
096: @param members The initial membership. This value reflects the membership to which the request
097: is sent (and from which potential responses are expected). Is reset by reset().
098: @param rsp_mode How many responses are expected. Can be
099: <ol>
100: <li><code>GET_ALL</code>: wait for all responses from non-suspected members.
101: A suspicion service might warn
102: us when a member from which a response is outstanding has crashed, so it can
103: be excluded from the responses. If no suspision service is available, a
104: timeout can be used (a value of 0 means wait forever). <em>If a timeout of
105: 0 is used, no suspicion service is available and a member from which we
106: expect a response has crashed, this methods blocks forever !</em>.
107: <li><code>GET_FIRST</code>: wait for the first available response.
108: <li><code>GET_MAJORITY</code>: wait for the majority of all responses. The
109: majority is re-computed when a member is suspected.
110: <li><code>GET_ABS_MAJORITY</code>: wait for the majority of
111: <em>all</em> members.
112: This includes failed members, so it may block if no timeout is specified.
113: <li><code>GET_N</CODE>: wait for N members.
114: Return if n is >= membership+suspects.
115: <li><code>GET_NONE</code>: don't wait for any response. Essentially send an
116: asynchronous message to the group members.
117: </ol>
118: */
119: public GroupRequest(Message m, RequestCorrelator corr,
120: Vector members, int rsp_mode) {
121: request_msg = m;
122: this .corr = corr;
123: this .rsp_mode = rsp_mode;
124: reset(members);
125: // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded
126: }
127:
128: /**
129: @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
130: (e.g. if a suspicion service is available; timeouts are not needed).
131: */
132: public GroupRequest(Message m, RequestCorrelator corr,
133: Vector members, int rsp_mode, long timeout,
134: int expected_mbrs) {
135: this (m, corr, members, rsp_mode);
136: if (timeout > 0)
137: this .timeout = timeout;
138: this .expected_mbrs = expected_mbrs;
139: }
140:
141: public GroupRequest(Message m, Transport transport, Vector members,
142: int rsp_mode) {
143: request_msg = m;
144: this .transport = transport;
145: this .rsp_mode = rsp_mode;
146: reset(members);
147: // suspects.removeAllElements(); // bela Aug 23 2002: make suspects bounded
148: }
149:
150: /**
151: * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
152: * (e.g. if a suspicion service is available; timeouts are not needed).
153: */
154: public GroupRequest(Message m, Transport transport, Vector members,
155: int rsp_mode, long timeout, int expected_mbrs) {
156: this (m, transport, members, rsp_mode);
157: if (timeout > 0)
158: this .timeout = timeout;
159: this .expected_mbrs = expected_mbrs;
160: }
161:
162: public Address getCaller() {
163: return caller;
164: }
165:
166: public void setCaller(Address caller) {
167: this .caller = caller;
168: }
169:
170: public boolean execute() throws Exception {
171: return execute(false);
172: }
173:
174: /**
175: * Sends the message. Returns when n responses have been received, or a
176: * timeout has occurred. <em>n</em> can be the first response, all
177: * responses, or a majority of the responses.
178: */
179: public boolean execute(boolean use_anycasting) throws Exception {
180: if (corr == null && transport == null) {
181: if (log.isErrorEnabled())
182: log
183: .error("both corr and transport are null, cannot send group request");
184: return false;
185: }
186:
187: try {
188: done = false;
189: boolean retval = doExecute(use_anycasting, timeout);
190: if (retval == false && log.isTraceEnabled())
191: log.trace("call did not execute correctly, request is "
192: + this .toString());
193: return retval;
194: } finally {
195: done = true;
196: }
197: }
198:
199: /**
200: * This method sets the <code>membership</code> variable to the value of
201: * <code>members</code>. It requires that the caller already hold the
202: * <code>rsp_mutex</code> lock.
203: * @param mbrs The new list of members
204: */
205: public final void reset(Vector mbrs) {
206: if (mbrs != null) {
207: Address mbr;
208: synchronized (requests) {
209: requests.clear();
210: for (int i = 0; i < mbrs.size(); i++) {
211: mbr = (Address) mbrs.elementAt(i);
212: requests.put(mbr, new Rsp(mbr));
213: }
214: }
215: // maintain local membership
216: synchronized (this .members) {
217: this .members.clear();
218: this .members.addAll(mbrs);
219: }
220: } else {
221: synchronized (requests) {
222: Rsp rsp;
223: for (Iterator it = requests.values().iterator(); it
224: .hasNext();) {
225: rsp = (Rsp) it.next();
226: rsp.setReceived(false);
227: rsp.setValue(null);
228: }
229: }
230: }
231: }
232:
233: /* ---------------------- Interface RspCollector -------------------------- */
234: /**
235: * <b>Callback</b> (called by RequestCorrelator or Transport).
236: * Adds a response to the response table. When all responses have been received,
237: * <code>execute()</code> returns.
238: */
239: public void receiveResponse(Object response_value, Address sender) {
240: if (done) {
241: if (log.isWarnEnabled())
242: log.warn("command is done; cannot add response !");
243: return;
244: }
245: if (suspects != null && suspects.size() > 0
246: && suspects.contains(sender)) {
247: if (log.isWarnEnabled())
248: log.warn("received response from suspected member "
249: + sender + "; discarding");
250: return;
251: }
252:
253: synchronized (requests) {
254: Rsp rsp = (Rsp) requests.get(sender);
255: if (rsp != null) {
256: if (rsp.wasReceived() == false) {
257: rsp.setValue(response_value);
258: rsp.setReceived(true);
259: if (log.isTraceEnabled())
260: log.trace(new StringBuffer(
261: "received response for request ")
262: .append(req_id).append(", sender=")
263: .append(sender).append(", val=")
264: .append(response_value));
265: requests.notifyAll(); // wakes up execute()
266: }
267: }
268: }
269: }
270:
271: /**
272: * <b>Callback</b> (called by RequestCorrelator or Transport).
273: * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
274: * This method would probably be called when getting a suspect message from a failure detector
275: * (where available). It is used to exclude faulty members from the response list.
276: */
277: public void suspect(Address suspected_member) {
278: Rsp rsp;
279:
280: if (suspected_member == null)
281: return;
282:
283: addSuspect(suspected_member);
284:
285: synchronized (requests) {
286: rsp = (Rsp) requests.get(suspected_member);
287: if (rsp != null) {
288: rsp.setSuspected(true);
289: rsp.setValue(null);
290: requests.notifyAll();
291: }
292: }
293: }
294:
295: /**
296: * Any member of 'membership' that is not in the new view is flagged as
297: * SUSPECTED. Any member in the new view that is <em>not</em> in the
298: * membership (ie, the set of responses expected for the current RPC) will
299: * <em>not</em> be added to it. If we did this we might run into the
300: * following problem:
301: * <ul>
302: * <li>Membership is {A,B}
303: * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the
304: * invocation handler)
305: * <li>C joins while A waits for responses from A and B
306: * <li>If this would generate a new view {A,B,C} and if this expanded the
307: * response set to {A,B,C}, A would wait forever on C's response because C
308: * never received the request in the first place, therefore won't send a
309: * response.
310: * </ul>
311: */
312: public void viewChange(View new_view) {
313: Address mbr;
314: Vector mbrs = new_view != null ? new_view.getMembers() : null;
315: if (requests == null || requests.size() == 0 || mbrs == null)
316: return;
317:
318: synchronized (this .members) {
319: this .members.clear();
320: this .members.addAll(mbrs);
321: }
322:
323: Map.Entry entry;
324: Rsp rsp;
325: boolean modified = false;
326: synchronized (requests) {
327: for (Iterator it = requests.entrySet().iterator(); it
328: .hasNext();) {
329: entry = (Map.Entry) it.next();
330: mbr = (Address) entry.getKey();
331: if (!mbrs.contains(mbr)) {
332: addSuspect(mbr);
333: rsp = (Rsp) entry.getValue();
334: rsp.setValue(null);
335: rsp.setSuspected(true);
336: modified = true;
337: }
338: }
339: if (modified)
340: requests.notifyAll();
341: }
342: }
343:
344: /* -------------------- End of Interface RspCollector ----------------------------------- */
345:
346: /** Returns the results as a RspList */
347: public RspList getResults() {
348: synchronized (requests) {
349: Collection rsps = requests.values();
350: return new RspList(rsps);
351: }
352: }
353:
354: public String toString() {
355: StringBuffer ret = new StringBuffer(128);
356: ret.append("[GroupRequest:\n");
357: ret.append("req_id=").append(req_id).append('\n');
358: if (caller != null)
359: ret.append("caller=").append(caller).append("\n");
360:
361: Map.Entry entry;
362: Address mbr;
363: Rsp rsp;
364: synchronized (requests) {
365: for (Iterator it = requests.entrySet().iterator(); it
366: .hasNext();) {
367: entry = (Map.Entry) it.next();
368: mbr = (Address) entry.getKey();
369: rsp = (Rsp) entry.getValue();
370: ret.append(mbr).append(": ").append(rsp).append("\n");
371: }
372: }
373: if (suspects.size() > 0)
374: ret.append("\nsuspects: ").append(suspects);
375: ret.append("\nrequest_msg: ").append(request_msg);
376: ret.append("\nrsp_mode: ").append(modeToString(rsp_mode));
377: ret.append("\ndone: ").append(done);
378: ret.append("\ntimeout: ").append(timeout);
379: ret.append("\nexpected_mbrs: ").append(expected_mbrs);
380: ret.append("\n]");
381: return ret.toString();
382: }
383:
384: public int getNumSuspects() {
385: return suspects.size();
386: }
387:
388: public Vector getSuspects() {
389: return suspects;
390: }
391:
392: public boolean isDone() {
393: return done;
394: }
395:
396: /* --------------------------------- Private Methods -------------------------------------*/
397:
398: private int determineMajority(int i) {
399: return i < 2 ? i : (i / 2) + 1;
400: }
401:
402: /** Generates a new unique request ID */
403: private static synchronized long getRequestId() {
404: long result = System.currentTimeMillis();
405: if (result <= last_req_id) {
406: result = last_req_id + 1;
407: }
408: last_req_id = result;
409: return result;
410: }
411:
412: /** This method runs with rsp_mutex locked (called by <code>execute()</code>). */
413: private boolean doExecute(boolean use_anycasting, long timeout)
414: throws Exception {
415: long start_time = 0;
416: Address suspect;
417: req_id = getRequestId();
418: reset(null); // clear 'responses' array
419:
420: synchronized (requests) {
421: for (int i = 0; i < suspects.size(); i++) { // mark all suspects in 'received' array
422: suspect = (Address) suspects.elementAt(i);
423: Rsp rsp = (Rsp) requests.get(suspect);
424: if (rsp != null) {
425: rsp.setSuspected(true);
426: break; // we can break here because we ensure there are no duplicate members
427: }
428: }
429: }
430:
431: try {
432: if (log.isTraceEnabled())
433: log.trace(new StringBuffer("sending request (id=")
434: .append(req_id).append(')'));
435: if (corr != null) {
436: java.util.List tmp = new Vector(members);
437: corr.sendRequest(req_id, tmp, request_msg,
438: rsp_mode == GET_NONE ? null : this ,
439: use_anycasting);
440: } else {
441: if (use_anycasting) {
442: List tmp = new ArrayList(members);
443: Message copy;
444: Address mbr;
445: for (Iterator it = tmp.iterator(); it.hasNext();) {
446: mbr = (Address) it.next();
447: copy = request_msg.copy(true);
448: copy.setDest(mbr);
449: transport.send(copy);
450: }
451: } else {
452: transport.send(request_msg);
453: }
454: }
455: } catch (Exception ex) {
456: if (corr != null)
457: corr.done(req_id);
458: throw ex;
459: }
460:
461: synchronized (requests) {
462: if (timeout <= 0) {
463: while (true) { /* Wait for responses: */
464: adjustMembership(); // may not be necessary, just to make sure...
465: if (responsesComplete()) {
466: if (corr != null) {
467: corr.done(req_id);
468: }
469: if (log.isTraceEnabled()) {
470: log.trace("received all responses: "
471: + toString());
472: }
473: return true;
474: }
475: try {
476: requests.wait();
477: } catch (Exception e) {
478: }
479: }
480: } else {
481: start_time = System.currentTimeMillis();
482: while (timeout > 0) { /* Wait for responses: */
483: if (responsesComplete()) {
484: if (corr != null)
485: corr.done(req_id);
486: if (log.isTraceEnabled())
487: log.trace("received all responses: "
488: + toString());
489: return true;
490: }
491: timeout = timeout
492: - (System.currentTimeMillis() - start_time);
493: if (timeout > 0) {
494: try {
495: requests.wait(timeout);
496: } catch (Exception e) {
497: }
498: }
499: }
500: if (corr != null) {
501: corr.done(req_id);
502: }
503: return false;
504: }
505: }
506: }
507:
508: private boolean responsesComplete() {
509: int num_received = 0, num_not_received = 0, num_suspected = 0;
510: final int num_total = requests.size();
511:
512: Rsp rsp;
513: for (Iterator it = requests.values().iterator(); it.hasNext();) {
514: rsp = (Rsp) it.next();
515: if (rsp.wasReceived()) {
516: num_received++;
517: } else {
518: if (rsp.wasSuspected()) {
519: num_suspected++;
520: } else {
521: num_not_received++;
522: }
523: }
524: }
525:
526: switch (rsp_mode) {
527: case GET_FIRST:
528: if (num_received > 0)
529: return true;
530: if (num_suspected >= num_total)
531: // e.g. 2 members, and both suspected
532: return true;
533: break;
534: case GET_ALL:
535: return num_received + num_suspected >= num_total;
536: case GET_MAJORITY:
537: int majority = determineMajority(num_total);
538: if (num_received + num_suspected >= majority)
539: return true;
540: break;
541: case GET_ABS_MAJORITY:
542: majority = determineMajority(num_total);
543: if (num_received >= majority)
544: return true;
545: break;
546: case GET_N:
547: if (expected_mbrs >= num_total) {
548: rsp_mode = GET_ALL;
549: return responsesComplete();
550: }
551: if (num_received >= expected_mbrs) {
552: return true;
553: }
554: if (num_received + num_not_received < expected_mbrs) {
555: return num_received + num_suspected >= expected_mbrs;
556: }
557: return false;
558: case GET_NONE:
559: return true;
560: default:
561: if (log.isErrorEnabled())
562: log.error("rsp_mode " + rsp_mode + " unknown !");
563: break;
564: }
565: return false;
566: }
567:
568: /**
569: * Adjusts the 'received' array in the following way:
570: * <ul>
571: * <li>if a member P in 'membership' is not in 'members', P's entry in the 'received' array
572: * will be marked as SUSPECTED
573: * <li>if P is 'suspected_mbr', then P's entry in the 'received' array will be marked
574: * as SUSPECTED
575: * </ul>
576: * This call requires exclusive access to rsp_mutex (called by getResponses() which has
577: * a the rsp_mutex locked, so this should not be a problem).
578: */
579: private void adjustMembership() {
580: if (requests.size() == 0)
581: return;
582:
583: Map.Entry entry;
584: Address mbr;
585: Rsp rsp;
586: for (Iterator it = requests.entrySet().iterator(); it.hasNext();) {
587: entry = (Map.Entry) it.next();
588: mbr = (Address) entry.getKey();
589: if ((!this .members.contains(mbr)) || suspects.contains(mbr)) {
590: addSuspect(mbr);
591: rsp = (Rsp) entry.getValue();
592: rsp.setValue(null);
593: rsp.setSuspected(true);
594: }
595: }
596: }
597:
598: /**
599: * Adds a member to the 'suspects' list. Removes oldest elements from 'suspects' list
600: * to keep the list bounded ('max_suspects' number of elements)
601: */
602: private void addSuspect(Address suspected_mbr) {
603: if (!suspects.contains(suspected_mbr)) {
604: suspects.addElement(suspected_mbr);
605: while (suspects.size() >= max_suspects
606: && suspects.size() > 0)
607: suspects.remove(0); // keeps queue bounded
608: }
609: }
610:
611: private String modeToString(int m) {
612: switch (m) {
613: case GET_FIRST:
614: return "GET_FIRST";
615: case GET_ALL:
616: return "GET_ALL";
617: case GET_MAJORITY:
618: return "GET_MAJORITY";
619: case GET_ABS_MAJORITY:
620: return "GET_ABS_MAJORITY";
621: case GET_N:
622: return "GET_N";
623: case GET_NONE:
624: return "GET_NONE";
625: default:
626: return "<unknown> (" + m + ")";
627: }
628: }
629: }
|