001: package org.jgroups.protocols;
002:
003: import EDU.oswego.cs.dl.util.concurrent.*;
004: import org.jgroups.*;
005: import org.jgroups.stack.Protocol;
006: import org.jgroups.util.BoundedList;
007: import org.jgroups.util.Streamable;
008: import org.jgroups.util.Util;
009:
010: import java.io.*;
011: import java.util.*;
012: import java.util.Map.Entry;
013:
014: /**
015: * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes
016: * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of
017: * how many credits it has received from a sender. When credits for a sender fall below a threshold,
018: * the receiver sends more credits to the sender. Works for both unicast and multicast messages.
019: * <p/>
020: * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this
021: * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down().
022: * <br/>This is the second simplified implementation of the same model. The algorithm is sketched out in
023: * doc/FlowControl.txt
024: * <br/>
025: * Changes (Brian) April 2006:
026: * <ol>
027: * <li>Receivers now send credits to a sender when more than min_credits have been received (rather than when min_credits
028: * are left)
029: * <li>Receivers don't send the full credits (max_credits), but rather tha actual number of bytes received
030: * <ol/>
031: * @author Bela Ban
032: * @version $Id: FC.java,v 1.53.2.11 2007/04/27 08:03:51 belaban Exp $
033: */
034: public class FC extends Protocol {
035:
036: /**
037: * HashMap<Address,Long>: keys are members, values are credits left. For each send, the
038: * number of credits is decremented by the message size
039: */
040: final Map sent = new HashMap(11);
041: // final Map sent=new ConcurrentHashMap(11);
042:
043: /**
044: * HashMap<Address,Long>: keys are members, values are credits left (in bytes).
045: * For each receive, the credits for the sender are decremented by the size of the received message.
046: * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT
047: * is received after reaching <tt>min_credits</tt> credits.
048: */
049: final Map received = new ConcurrentReaderHashMap(11);
050: // final Map received=new ConcurrentHashMap(11);
051:
052: /**
053: * List of members from whom we expect credits
054: */
055: final List creditors = new ArrayList(11);
056:
057: /** Peers who have asked for credit that we didn't have */
058: final Set pending_requesters = new HashSet(11);
059:
060: /**
061: * Max number of bytes to send per receiver until an ack must
062: * be received before continuing sending
063: */
064: private long max_credits = 500000;
065: private Long max_credits_constant = new Long(max_credits);
066:
067: /**
068: * Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send
069: * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to
070: * wait forever.
071: */
072: private long max_block_time = 5000;
073:
074: /**
075: * If credits fall below this limit, we send more credits to the sender. (We also send when
076: * credits are exhausted (0 credits left))
077: */
078: private double min_threshold = 0.25;
079:
080: /**
081: * Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will
082: * override the above computation
083: */
084: private long min_credits = 0;
085:
086: /**
087: * Whether FC is still running, this is set to false when the protocol terminates (on stop())
088: */
089: private boolean running = true;
090:
091: /**
092: * Determines whether or not to block on down(). Set when not enough credit is available to send a message
093: * to all or a single member
094: */
095: private boolean insufficient_credit = false;
096:
097: /**
098: * the lowest credits of any destination (sent_msgs)
099: */
100: private long lowest_credit = max_credits;
101:
102: /**
103: * Lock to be used with the CondVar below.
104: */
105: final Sync lock = new ReentrantLock();
106:
107: /**
108: * Mutex to block on down()
109: */
110: final CondVar mutex = new CondVar(lock);
111:
112: /**
113: * Whether an up thread that comes back down should be allowed to
114: * bypass blocking if all credits are exhausted. Avoids JGRP-465.
115: */
116: private boolean ignore_synchronous_response = true;
117:
118: /**
119: * Thread that carries messages through up() and shouldn't be blocked
120: * in down() if ignore_synchronous_response==true. JGRP-465.
121: */
122: private Thread ignore_thread;
123:
124: static final String name = "FC";
125:
126: private long start_blocking = 0;
127:
128: /**
129: * Map<Address, Long> of the last time we requested credit
130: */
131: private final Map last_credit_request = new ConcurrentHashMap();
132:
133: private int num_blockings = 0;
134: private int num_credit_requests_received = 0,
135: num_credit_requests_sent = 0;
136: private int num_credit_responses_sent = 0,
137: num_credit_responses_received = 0;
138: private long total_time_blocking = 0;
139:
140: final BoundedList last_blockings = new BoundedList(50);
141:
142: final static FcHeader REPLENISH_HDR = new FcHeader(
143: FcHeader.REPLENISH);
144: final static FcHeader CREDIT_REQUEST_HDR = new FcHeader(
145: FcHeader.CREDIT_REQUEST);
146:
147: public final String getName() {
148: return name;
149: }
150:
151: public void resetStats() {
152: super .resetStats();
153: num_blockings = 0;
154: num_credit_responses_sent = num_credit_responses_received = num_credit_requests_received = num_credit_requests_sent = 0;
155: total_time_blocking = 0;
156: last_blockings.removeAll();
157: }
158:
159: public long getMaxCredits() {
160: return max_credits;
161: }
162:
163: public void setMaxCredits(long max_credits) {
164: this .max_credits = max_credits;
165: max_credits_constant = new Long(this .max_credits);
166: }
167:
168: public double getMinThreshold() {
169: return min_threshold;
170: }
171:
172: public void setMinThreshold(double min_threshold) {
173: this .min_threshold = min_threshold;
174: }
175:
176: public long getMinCredits() {
177: return min_credits;
178: }
179:
180: public void setMinCredits(long min_credits) {
181: this .min_credits = min_credits;
182: }
183:
184: public boolean isBlocked() {
185: return insufficient_credit;
186: }
187:
188: public int getNumberOfBlockings() {
189: return num_blockings;
190: }
191:
192: public long getMaxBlockTime() {
193: return max_block_time;
194: }
195:
196: public void setMaxBlockTime(long t) {
197: max_block_time = t;
198: }
199:
200: public long getTotalTimeBlocked() {
201: return total_time_blocking;
202: }
203:
204: public double getAverageTimeBlocked() {
205: return num_blockings == 0 ? 0.0 : total_time_blocking
206: / (double) num_blockings;
207: }
208:
209: public int getNumberOfCreditRequestsReceived() {
210: return num_credit_requests_received;
211: }
212:
213: public int getNumberOfCreditRequestsSent() {
214: return num_credit_requests_sent;
215: }
216:
217: public int getNumberOfCreditResponsesReceived() {
218: return num_credit_responses_received;
219: }
220:
221: public int getNumberOfCreditResponsesSent() {
222: return num_credit_responses_sent;
223: }
224:
225: public String printSenderCredits() {
226: return printMap(sent);
227: }
228:
229: public String printReceiverCredits() {
230: return printMap(received);
231: }
232:
233: public String printCredits() {
234: StringBuffer sb = new StringBuffer();
235: sb.append("senders:\n").append(printMap(sent)).append(
236: "\n\nreceivers:\n").append(printMap(received));
237: return sb.toString();
238: }
239:
240: public Map dumpStats() {
241: Map retval = super .dumpStats();
242: if (retval == null)
243: retval = new HashMap();
244: retval.put("senders", printMap(sent));
245: retval.put("receivers", printMap(received));
246: retval.put("num_blockings", new Integer(this .num_blockings));
247: retval.put("avg_time_blocked", new Double(
248: getAverageTimeBlocked()));
249: retval.put("num_replenishments", new Integer(
250: this .num_credit_responses_received));
251: retval.put("total_time_blocked", new Long(total_time_blocking));
252: return retval;
253: }
254:
255: public String showLastBlockingTimes() {
256: return last_blockings.toString();
257: }
258:
259: /**
260: * Allows to unblock a blocked sender from an external program, e.g. JMX
261: */
262: public void unblock() {
263: if (Util.acquire(lock)) {
264: try {
265: if (log.isTraceEnabled())
266: log
267: .trace("unblocking the sender and replenishing all members, creditors are "
268: + creditors);
269:
270: Map.Entry entry;
271: for (Iterator it = sent.entrySet().iterator(); it
272: .hasNext();) {
273: entry = (Map.Entry) it.next();
274: entry.setValue(max_credits_constant);
275: }
276:
277: lowest_credit = computeLowestCredit(sent);
278: creditors.clear();
279: insufficient_credit = false;
280: mutex.broadcast();
281: } finally {
282: Util.release(lock);
283: }
284: }
285: }
286:
287: public boolean setProperties(Properties props) {
288: String str;
289: boolean min_credits_set = false;
290:
291: super .setProperties(props);
292: str = props.getProperty("max_credits");
293: if (str != null) {
294: max_credits = Long.parseLong(str);
295: props.remove("max_credits");
296: }
297:
298: str = props.getProperty("min_threshold");
299: if (str != null) {
300: min_threshold = Double.parseDouble(str);
301: props.remove("min_threshold");
302: }
303:
304: str = props.getProperty("min_credits");
305: if (str != null) {
306: min_credits = Long.parseLong(str);
307: props.remove("min_credits");
308: min_credits_set = true;
309: }
310:
311: if (!min_credits_set)
312: min_credits = (long) ((double) max_credits * min_threshold);
313:
314: str = props.getProperty("max_block_time");
315: if (str != null) {
316: max_block_time = Long.parseLong(str);
317: props.remove("max_block_time");
318: }
319:
320: str = props.getProperty("ignore_synchronous_response");
321: if (str != null) {
322: ignore_synchronous_response = Boolean.valueOf(str)
323: .booleanValue();
324: props.remove("ignore_synchronous_response");
325: }
326:
327: if (!props.isEmpty()) {
328: log.error("the following properties are not recognized: "
329: + props);
330: return false;
331: }
332: max_credits_constant = new Long(max_credits);
333: return true;
334: }
335:
336: public void start() throws Exception {
337: super .start();
338: lock.acquire();
339: try {
340: running = true;
341: insufficient_credit = false;
342: lowest_credit = max_credits;
343: } finally {
344: lock.release();
345: }
346: }
347:
348: public void stop() {
349: super .stop();
350: if (Util.acquire(lock)) {
351: try {
352: running = false;
353: ignore_thread = null;
354: mutex.broadcast(); // notify all threads waiting on the mutex that we are done
355: } finally {
356: Util.release(lock);
357: }
358: }
359: }
360:
361: /**
362: * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g.
363: * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock !
364: * @param evt
365: */
366: protected void receiveDownEvent(Event evt) {
367: if (evt.getType() == Event.VIEW_CHANGE) {
368: View v = (View) evt.getArg();
369: Vector mbrs = v.getMembers();
370: handleViewChange(mbrs);
371: }
372: super .receiveDownEvent(evt);
373: }
374:
375: public void down(Event evt) {
376: switch (evt.getType()) {
377: case Event.MSG:
378: handleDownMessage(evt);
379: return;
380: }
381: passDown(evt); // this could potentially use the lower protocol's thread which may block
382: }
383:
384: public void up(Event evt) {
385: switch (evt.getType()) {
386:
387: case Event.MSG:
388:
389: // JGRP-465. We only deal with msgs to avoid having to use
390: // a concurrent collection; ignore views, suspicions, etc
391: // which can come up on unusual threads.
392: if (ignore_thread == null && ignore_synchronous_response)
393: ignore_thread = Thread.currentThread();
394:
395: Message msg = (Message) evt.getArg();
396: FcHeader hdr = (FcHeader) msg.removeHeader(name);
397: if (hdr != null) {
398: switch (hdr.type) {
399: case FcHeader.REPLENISH:
400: num_credit_responses_received++;
401: handleCredit(msg.getSrc(), (Number) msg.getObject());
402: break;
403: case FcHeader.CREDIT_REQUEST:
404: num_credit_requests_received++;
405: Address sender = msg.getSrc();
406: Long sent_credits = (Long) msg.getObject();
407: handleCreditRequest(sender, sent_credits);
408: break;
409: default:
410: log.error("header type " + hdr.type + " not known");
411: break;
412: }
413: return; // don't pass message up
414: } else {
415: adjustCredit(msg);
416: }
417: break;
418:
419: case Event.VIEW_CHANGE:
420: handleViewChange(((View) evt.getArg()).getMembers());
421: break;
422: }
423:
424: passUp(evt);
425: }
426:
427: private void handleDownMessage(Event evt) {
428: Message msg = (Message) evt.getArg();
429: int length = msg.getLength();
430: Address dest = msg.getDest();
431:
432: if (Util.acquire(lock)) {
433: try {
434: if (lowest_credit <= length) {
435: if (ignore_synchronous_response
436: && ignore_thread == Thread.currentThread()) { // JGRP-465
437: if (log.isTraceEnabled())
438: log
439: .trace("Bypassing blocking to avoid deadlocking "
440: + Thread.currentThread());
441: } else {
442: determineCreditors(dest, length);
443:
444: long blockStart = System.currentTimeMillis();
445: if (!insufficient_credit) {
446: insufficient_credit = true;
447: start_blocking = blockStart;
448: if (log.isTraceEnabled()) {
449: log
450: .trace("Starting blocking. lowest_credit="
451: + lowest_credit
452: + "; msg length ="
453: + length);
454: }
455: }
456: num_blockings++;
457:
458: while (insufficient_credit && running) {
459: try {
460: mutex.timedwait(max_block_time);
461: } catch (InterruptedException e) {
462: }
463: if (insufficient_credit && running) {
464: long waitTime = System
465: .currentTimeMillis()
466: - blockStart;
467: if (log.isTraceEnabled()) {
468: log
469: .trace("Still waiting for credits -- waiting "
470: + waitTime + " ms");
471: }
472:
473: // Only ask for credit if we blocked over max_block_time,
474: // otherwise it's not an emergency
475: if (waitTime >= max_block_time) {
476:
477: // Creditors may have been cleared but credit
478: // receipt was insufficient to let all
479: // blocked threads proceed. So, redetermine
480: determineCreditors(dest, length);
481:
482: Map sent_copy = new HashMap(sent);
483: sent_copy.keySet().retainAll(
484: creditors);
485: // we need to send the credit requests down *without* holding the lock, otherwise we might
486: // run into the deadlock described in http://jira.jboss.com/jira/browse/JGRP-292
487: Util.release(lock);
488: try {
489: for (Iterator it = sent_copy
490: .entrySet().iterator(); it
491: .hasNext();) {
492: Map.Entry e = (Entry) it
493: .next();
494: sendCreditRequest(
495: (Address) e
496: .getKey(),
497: (Long) e.getValue());
498: }
499: } finally {
500: Util.acquire(lock);
501: }
502: }
503: }
504: }
505:
506: long block_time = System.currentTimeMillis()
507: - blockStart;
508: if (log.isTraceEnabled())
509: log.trace("total time blocked: "
510: + block_time + " ms");
511: total_time_blocking += block_time;
512: last_blockings.add(new Long(block_time));
513: }
514: }
515:
516: long tmp = decrementCredit(sent, dest, length);
517: if (tmp != -1)
518: lowest_credit = Math.min(tmp, lowest_credit);
519:
520: } finally {
521: Util.release(lock);
522: }
523: }
524:
525: // send message - either after regular processing, or after blocking (when enough credits available again)
526: passDown(evt);
527: }
528:
529: /**
530: * Checks whether one member (unicast msg) or all members (multicast msg) have enough credits. Add those
531: * that don't to the creditors list
532: * @param dest
533: * @param length
534: */
535: private void determineCreditors(Address dest, int length) {
536: boolean multicast = dest == null || dest.isMulticastAddress();
537: Address mbr;
538: Long credits;
539: if (multicast) {
540: Map.Entry entry;
541: for (Iterator it = sent.entrySet().iterator(); it.hasNext();) {
542: entry = (Map.Entry) it.next();
543: mbr = (Address) entry.getKey();
544: credits = (Long) entry.getValue();
545: if (credits.longValue() <= length) {
546: if (!creditors.contains(mbr))
547: creditors.add(mbr);
548: }
549: }
550: } else {
551: credits = (Long) sent.get(dest);
552: if (credits != null && credits.longValue() <= length) {
553: if (!creditors.contains(dest))
554: creditors.add(dest);
555: }
556: }
557: }
558:
559: /**
560: * Decrements credits from a single member, or all members in sent_msgs, depending on whether it is a multicast
561: * or unicast message. No need to acquire mutex (must already be held when this method is called)
562: * @param dest
563: * @param credits
564: * @return The lowest number of credits left, or -1 if a unicast member was not found
565: */
566: private long decrementCredit(Map m, Address dest, long credits) {
567: boolean multicast = dest == null || dest.isMulticastAddress();
568: long lowest = max_credits, tmp;
569: Long val;
570:
571: if (multicast) {
572: if (m.isEmpty())
573: return -1;
574: Map.Entry entry;
575: for (Iterator it = m.entrySet().iterator(); it.hasNext();) {
576: entry = (Map.Entry) it.next();
577: val = (Long) entry.getValue();
578: tmp = val.longValue();
579: tmp -= credits;
580: entry.setValue(new Long(tmp));
581: lowest = Math.min(tmp, lowest);
582: }
583: return lowest;
584: } else {
585: val = (Long) m.get(dest);
586: if (val != null) {
587: lowest = val.longValue();
588: lowest -= credits;
589: m.put(dest, new Long(lowest));
590: return lowest;
591: }
592: }
593: return -1;
594: }
595:
596: private void handleCredit(Address sender, Number increase) {
597: if (sender == null)
598: return;
599: StringBuffer sb = null;
600:
601: if (Util.acquire(lock)) {
602: try {
603: Long old_credit = (Long) sent.get(sender);
604: long increased = old_credit.longValue()
605: + increase.longValue();
606: Long new_credit = new Long(Math.min(max_credits,
607: increased));
608:
609: if (log.isTraceEnabled()) {
610: sb = new StringBuffer();
611: sb.append("received " + increase + " credit from ")
612: .append(sender).append(", old credit was ")
613: .append(old_credit).append(
614: ", new credits are ").append(
615: new_credit);
616: if (increased > max_credits)
617: sb.append(" ignored over-credit of "
618: + (increased - max_credits));
619: }
620:
621: sent.put(sender, new_credit);
622: lowest_credit = computeLowestCredit(sent);
623: if (!creditors.isEmpty()) { // we are blocked because we expect credit from one or more members
624:
625: if (log.isTraceEnabled())
626: sb.append(".\nCreditors before are: ").append(
627: creditors);
628:
629: creditors.remove(sender);
630:
631: if (log.isTraceEnabled()) {
632: sb.append("\nCreditors after removal of ")
633: .append(sender).append(" are: ")
634: .append(creditors).append(
635: "; lowest_credit=").append(
636: lowest_credit);
637: }
638: }
639:
640: if (insufficient_credit && lowest_credit > 0
641: && creditors.isEmpty()) {
642: insufficient_credit = false;
643: mutex.broadcast();
644: if (log.isTraceEnabled())
645: sb
646: .append("\nTotal block time = "
647: + (System.currentTimeMillis() - start_blocking));
648: }
649:
650: if (log.isTraceEnabled())
651: log.trace(sb.toString());
652: } finally {
653: Util.release(lock);
654: }
655: } else {
656: if (log.isWarnEnabled())
657: log.warn(increase + " credits from " + sender
658: + " were dropped, lock could not be acquired");
659: }
660: }
661:
662: private static long computeLowestCredit(Map m) {
663: Collection credits = m.values(); // List of Longs (credits)
664: Long retval = (Long) Collections.min(credits);
665: return retval.longValue();
666: }
667:
668: /**
669: * Check whether sender has enough credits left. If not, send him some more
670: * @param msg
671: */
672: private void adjustCredit(Message msg) {
673: Address src = msg.getSrc();
674: long length = msg.getLength(); // we don't care about headers for the purpose of flow control
675:
676: if (src == null) {
677: if (log.isErrorEnabled())
678: log.error("src is null");
679: return;
680: }
681:
682: if (length == 0)
683: return; // no effect
684:
685: long remaining_cred = decrementCredit(received, src, length);
686: long credit_response = max_credits - remaining_cred;
687: if (credit_response >= min_credits) {
688: received.put(src, max_credits_constant);
689: if (!pending_requesters.isEmpty())
690: pending_requesters.remove(src);
691: if (log.isTraceEnabled())
692: log.trace("sending " + credit_response
693: + " replenishment credits to " + src);
694: sendCredit(src, credit_response);
695: }
696: }
697:
698: private void handleCreditRequest(Address sender, Long sender_credit) {
699: if (sender == null)
700: return;
701:
702: if (Util.acquire(lock)) {
703: long credit_response = 0;
704: try {
705: Long old_credit = (Long) received.get(sender);
706: if (old_credit != null) {
707: credit_response = max_credits
708: - old_credit.longValue();
709: }
710:
711: if (credit_response > 0) {
712: if (log.isTraceEnabled())
713: log.trace("received credit request from "
714: + sender + ": sending "
715: + credit_response + " credits");
716: received.put(sender, max_credits_constant);
717: pending_requesters.remove(sender);
718: } else {
719: if (pending_requesters.contains(sender)) {
720: // a sender might have negative credits, e.g. -20000. If we subtracted -20000 from max_credits,
721: // we'd end up with max_credits + 20000, and send too many credits back. So if the sender's
722: // credits is negative, we simply send max_credits back
723: long credits_left = sender_credit.longValue();
724: if (credits_left < 0)
725: credits_left = 0;
726: credit_response = max_credits - credits_left;
727: // credit_response = max_credits;
728: received.put(sender, max_credits_constant);
729: pending_requesters.remove(sender);
730: if (log.isWarnEnabled())
731: log
732: .warn("Received two credit requests from "
733: + sender
734: + " without any intervening messages; sending "
735: + credit_response
736: + " credits");
737: } else {
738: pending_requesters.add(sender);
739: if (log.isTraceEnabled())
740: log.trace("received credit request from "
741: + sender
742: + " but have no credits available");
743: }
744: }
745: } finally {
746: Util.release(lock);
747: }
748:
749: if (credit_response > 0)
750: sendCredit(sender, credit_response);
751: }
752: }
753:
754: /**
755: * Returns the max credits. Handling a credit request should be the exception, not the normal case.
756: * @param sender
757: * todo: see if this solves Brian's deadlock problems. If not, use the (commented) method above !
758: */
759: // private void handleCreditRequest(Address sender) {
760: // if(sender == null) {
761: // if(log.isWarnEnabled())
762: // log.warn("sender is null, not able to send credits");
763: // return;
764: // }
765: //
766: // if(log.isTraceEnabled()) {
767: // Long recL = (Long) received.get(sender);
768: // long rec = recL == null ? 0 : max_credits - recL.longValue();
769: // log.trace("received credit request from " + sender + ": sending " +
770: // max_credits + " credits: had received " + rec + " bytes");
771: // }
772: //
773: // received.put(sender, max_credits_constant);
774: // sendCredit(sender, max_credits);
775: // }
776:
777: private void sendCredit(Address dest, long credit) {
778: Number number;
779: if (credit < Integer.MAX_VALUE)
780: number = new Integer((int) credit);
781: else
782: number = new Long(credit);
783: Message msg = new Message(dest, null, number);
784: msg.putHeader(name, REPLENISH_HDR);
785: passDown(new Event(Event.MSG, msg));
786: num_credit_responses_sent++;
787: }
788:
789: /**
790: * Sends a credit request to dest. If the last credit request was sent shortly before (less than max_block_time
791: * milliseconds ago), then we discard the request. This ensures that credit requests are not sent more frequently
792: * than every max_block_time milliseconds, preventing credit request storms
793: * @param dest
794: * @param credit_balance
795: */
796: private void sendCreditRequest(final Address dest,
797: final Long credit_balance) {
798: if (max_block_time > 0) {
799: // This call is made with the lock released, so ensure the get/put is atomic
800: long now = System.currentTimeMillis();
801: Long last = (Long) last_credit_request.get(dest);
802: if (last != null && now - last.longValue() < max_block_time) {
803: return;
804: }
805: last_credit_request.put(dest, new Long(now));
806: }
807:
808: if (log.isTraceEnabled())
809: log.trace("sending credit request to " + dest
810: + "; balance=" + credit_balance);
811:
812: Message msg = new Message(dest, null, credit_balance);
813: msg.putHeader(name, CREDIT_REQUEST_HDR);
814: passDown(new Event(Event.MSG, msg));
815: num_credit_requests_sent++;
816: }
817:
818: private void handleViewChange(Vector mbrs) {
819: Address addr;
820: if (mbrs == null)
821: return;
822: if (log.isTraceEnabled())
823: log.trace("new membership: " + mbrs);
824:
825: if (Util.acquire(lock)) {
826: try {
827: // add members not in membership to received and sent hashmap (with full credits)
828: for (int i = 0; i < mbrs.size(); i++) {
829: addr = (Address) mbrs.elementAt(i);
830: if (!received.containsKey(addr))
831: received.put(addr, max_credits_constant);
832: if (!sent.containsKey(addr))
833: sent.put(addr, max_credits_constant);
834: }
835: // remove members that left
836: for (Iterator it = received.keySet().iterator(); it
837: .hasNext();) {
838: addr = (Address) it.next();
839: if (!mbrs.contains(addr))
840: it.remove();
841: }
842:
843: // remove members that left
844: for (Iterator it = sent.keySet().iterator(); it
845: .hasNext();) {
846: addr = (Address) it.next();
847: if (!mbrs.contains(addr))
848: it.remove(); // modified the underlying map
849: }
850:
851: // remove all creditors which are not in the new view
852: for (int i = 0; i < creditors.size(); i++) {
853: Address creditor = (Address) creditors.get(i);
854: if (!mbrs.contains(creditor))
855: creditors.remove(creditor);
856: }
857:
858: if (log.isTraceEnabled())
859: log.trace("creditors are " + creditors);
860: if (insufficient_credit && creditors.isEmpty()) {
861: lowest_credit = computeLowestCredit(sent);
862: insufficient_credit = false;
863: mutex.broadcast();
864: }
865:
866: // keep it simple and just clear the last_credit_request Map
867: // at worst we get an extra credit request
868: last_credit_request.clear();
869: } finally {
870: Util.release(lock);
871: }
872: }
873: }
874:
875: private static String printMap(Map m) {
876: Map.Entry entry;
877: StringBuffer sb = new StringBuffer();
878: for (Iterator it = m.entrySet().iterator(); it.hasNext();) {
879: entry = (Map.Entry) it.next();
880: sb.append(entry.getKey()).append(": ").append(
881: entry.getValue()).append("\n");
882: }
883: return sb.toString();
884: }
885:
886: public static class FcHeader extends Header implements Streamable {
887: public static final byte REPLENISH = 1;
888: public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester
889:
890: byte type = REPLENISH;
891:
892: public FcHeader() {
893:
894: }
895:
896: public FcHeader(byte type) {
897: this .type = type;
898: }
899:
900: public long size() {
901: return Global.BYTE_SIZE;
902: }
903:
904: public void writeExternal(ObjectOutput out) throws IOException {
905: out.writeByte(type);
906: }
907:
908: public void readExternal(ObjectInput in) throws IOException,
909: ClassNotFoundException {
910: type = in.readByte();
911: }
912:
913: public void writeTo(DataOutputStream out) throws IOException {
914: out.writeByte(type);
915: }
916:
917: public void readFrom(DataInputStream in) throws IOException,
918: IllegalAccessException, InstantiationException {
919: type = in.readByte();
920: }
921:
922: public String toString() {
923: switch (type) {
924: case REPLENISH:
925: return "REPLENISH";
926: case CREDIT_REQUEST:
927: return "CREDIT_REQUEST";
928: default:
929: return "<invalid type>";
930: }
931: }
932: }
933:
934: }
|