001: package org.jgroups.protocols;
002:
003: import org.jgroups.stack.Protocol;
004: import org.jgroups.Event;
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007:
008: import java.util.List;
009: import java.util.LinkedList;
010: import java.util.Vector;
011:
012: /**
013: * Class that waits for n PingRsp'es, or m milliseconds to return the initial membership
014: * @author Bela Ban
015: * @version $Id: PingWaiter.java,v 1.11.10.2 2007/04/27 08:03:52 belaban Exp $
016: */
017: public class PingWaiter implements Runnable {
018: Thread thread = null;
019: final List rsps = new LinkedList();
020: long timeout = 3000;
021: int num_rsps = 3;
022: Protocol parent = null;
023: PingSender ping_sender;
024: protected final Log log = LogFactory.getLog(this .getClass());
025: private boolean trace = log.isTraceEnabled();
026:
027: public PingWaiter(long timeout, int num_rsps, Protocol parent,
028: PingSender ping_sender) {
029: this .timeout = timeout;
030: this .num_rsps = num_rsps;
031: this .parent = parent;
032: this .ping_sender = ping_sender;
033: }
034:
035: void setTimeout(long timeout) {
036: this .timeout = timeout;
037: }
038:
039: void setNumRsps(int num) {
040: this .num_rsps = num;
041: }
042:
043: public synchronized void start() {
044: // ping_sender.start();
045: if (thread == null || !thread.isAlive()) {
046: thread = new Thread(this , "PingWaiter");
047: thread.setDaemon(true);
048: thread.start();
049: }
050: }
051:
052: public synchronized void stop() {
053: if (ping_sender != null)
054: ping_sender.stop();
055: if (thread != null) {
056: // Thread tmp=t;
057: thread = null;
058: // tmp.interrupt();
059: synchronized (rsps) {
060: rsps.notifyAll();
061: }
062: }
063: }
064:
065: public synchronized boolean isRunning() {
066: return thread != null && thread.isAlive();
067: }
068:
069: public void addResponse(PingRsp rsp) {
070: if (rsp != null) {
071: synchronized (rsps) {
072: if (rsps.contains(rsp))
073: rsps.remove(rsp); // overwrite existing element
074: rsps.add(rsp);
075: rsps.notifyAll();
076: }
077: }
078: }
079:
080: public void clearResponses() {
081: synchronized (rsps) {
082: rsps.clear();
083: rsps.notifyAll();
084: }
085: }
086:
087: public List getResponses() {
088: return rsps;
089: }
090:
091: public void run() {
092: Vector responses = findInitialMembers();
093: synchronized (this ) {
094: thread = null;
095: }
096: if (parent != null)
097: parent.passUp(new Event(Event.FIND_INITIAL_MBRS_OK,
098: responses));
099: }
100:
101: public Vector findInitialMembers() {
102: long start_time, time_to_wait;
103:
104: synchronized (rsps) {
105: if (rsps.size() > 0) {
106: rsps.clear();
107: }
108:
109: ping_sender.start();
110:
111: start_time = System.currentTimeMillis();
112: time_to_wait = timeout;
113:
114: try {
115: while (rsps.size() < num_rsps && time_to_wait > 0
116: && thread != null
117: && Thread.currentThread().equals(thread)) {
118: if (log.isTraceEnabled()) // +++ remove
119: log
120: .trace(new StringBuffer(
121: "waiting for initial members: time_to_wait=")
122: .append(time_to_wait).append(
123: ", got ").append(
124: rsps.size()).append(
125: " rsps"));
126:
127: try {
128: rsps.wait(time_to_wait);
129: } catch (InterruptedException intex) {
130: } catch (Exception e) {
131: log
132: .error(
133: "got an exception waiting for responses",
134: e);
135: }
136: time_to_wait = timeout
137: - (System.currentTimeMillis() - start_time);
138: }
139: if (log.isTraceEnabled())
140: log.trace(new StringBuffer("initial mbrs are ")
141: .append(rsps));
142: return new Vector(rsps);
143: } finally {
144: if (ping_sender != null)
145: ping_sender.stop();
146: }
147: }
148: }
149:
150: }
|