01: //$Id: UdpRingNode.java,v 1.5 2004/09/23 16:29:40 belaban Exp $
02:
03: package org.jgroups.protocols.ring;
04:
05: import org.jgroups.Address;
06: import org.jgroups.Event;
07: import org.jgroups.Message;
08: import org.jgroups.protocols.TOTAL_TOKEN;
09: import org.jgroups.stack.IpAddress;
10: import org.jgroups.stack.RpcProtocol;
11:
12: import java.io.Serializable;
13: import java.util.Vector;
14:
15: public class UdpRingNode implements RingNode {
16:
17: final Address this Node;
18: Address nextNode;
19: final RpcProtocol rpcProtocol;
20: Object token;
21: final Object mutex = new Object();
22: final TOTAL_TOKEN.RingTokenHeader tokenHeader;
23: boolean tokenInStack = false;
24:
25: public UdpRingNode(RpcProtocol owner, Address memberAddress) {
26: rpcProtocol = owner;
27: this Node = memberAddress;
28: nextNode = null;
29: tokenHeader = new TOTAL_TOKEN.RingTokenHeader();
30: }
31:
32: public IpAddress getTokenReceiverAddress() {
33: return (IpAddress) this Node;
34: }
35:
36: public synchronized void tokenArrived(Object token) {
37: tokenInStack = true;
38: this .token = token;
39: this .notifyAll();
40: }
41:
42: public Object receiveToken(int timeout) throws TokenLostException {
43: Address wasNext = nextNode;
44:
45: try {
46:
47: synchronized (this ) {
48: while (!tokenInStack) {
49: wait(timeout);
50: break;
51: }
52:
53: //we haven't received token for the time of timeout
54: if (!tokenInStack) {
55: throw new TokenLostException(
56: "Token wait timout expired", null, wasNext,
57: TokenLostException.WHILE_RECEIVING);
58: }
59: }
60: } catch (InterruptedException ie) {
61: throw new TokenLostException("Token thread interrupted",
62: ie, wasNext, TokenLostException.WHILE_RECEIVING);
63: }
64: return token;
65: }
66:
67: public Object receiveToken() throws TokenLostException {
68: return receiveToken(0);
69: }
70:
71: public synchronized void passToken(Object token) {
72: Message t = new Message(nextNode, this Node,
73: (Serializable) token);
74: t.putHeader(TOTAL_TOKEN.prot_name, tokenHeader);
75: rpcProtocol.passDown(new Event(Event.MSG, t));
76: tokenInStack = false;
77: }
78:
79: public synchronized void reconfigure(Vector newMembers) {
80: if (isNextNeighbourChanged(newMembers)) {
81: nextNode = getNextNode(newMembers);
82: }
83: }
84:
85: private boolean isNextNeighbourChanged(Vector newMembers) {
86: Address oldNeighbour = nextNode;
87: Address newNeighbour = getNextNode(newMembers);
88: return !(newNeighbour.equals(oldNeighbour));
89: }
90:
91: private Address getNextNode(Vector otherNodes) {
92: int myIndex = otherNodes.indexOf(this Node);
93: return (myIndex == otherNodes.size() - 1) ? (Address) otherNodes
94: .firstElement()
95: : (Address) otherNodes.elementAt(myIndex + 1);
96:
97: }
98: }
|