001: //$Id: TcpRingNode.java,v 1.4 2005/08/08 12:45:41 belaban Exp $
002:
003: package org.jgroups.protocols.ring;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Address;
008: import org.jgroups.SuspectedException;
009: import org.jgroups.TimeoutException;
010: import org.jgroups.blocks.GroupRequest;
011: import org.jgroups.stack.IpAddress;
012: import org.jgroups.stack.RpcProtocol;
013: import org.jgroups.util.Util;
014:
015: import java.io.*;
016: import java.net.ServerSocket;
017: import java.net.Socket;
018: import java.util.Vector;
019:
020: public class TcpRingNode implements RingNode {
021:
022: final ServerSocket tokenReceiver;
023: Socket previous, next;
024: final Address this Node;
025: Address nextNode;
026: ObjectInputStream ios;
027: ObjectOutputStream oos;
028: final RpcProtocol rpcProtocol;
029: final boolean failedOnTokenLostException = false;
030:
031: final Object socketMutex = new Object();
032: protected final Log log = LogFactory.getLog(this .getClass());
033:
034: public TcpRingNode(RpcProtocol owner, Address memberAddress) {
035: tokenReceiver = Util.createServerSocket(12000);
036: rpcProtocol = owner;
037: this Node = memberAddress;
038: nextNode = null;
039: }
040:
041: public IpAddress getTokenReceiverAddress() {
042: return new IpAddress(tokenReceiver.getLocalPort());
043: }
044:
045: public Object receiveToken(int timeout) throws TokenLostException {
046: RingToken token = null;
047: Address wasNextNode = nextNode;
048: try {
049: if (previous == null) {
050: previous = tokenReceiver.accept();
051: ios = new ObjectInputStream((previous.getInputStream()));
052:
053: }
054: previous.setSoTimeout(timeout);
055: token = new RingToken();
056: token.readExternal(ios);
057: } catch (InterruptedIOException io) {
058: //read was blocked for more than a timeout, assume token lost
059: throw new TokenLostException(io.getMessage(), io,
060: wasNextNode, TokenLostException.WHILE_RECEIVING);
061: } catch (ClassNotFoundException cantHappen) {
062: } catch (IOException ioe) {
063: closeSocket(previous);
064: previous = null;
065: if (ios != null) {
066: try {
067: ios.close();
068: } catch (IOException ignored) {
069: }
070: }
071:
072: token = (RingToken) receiveToken(timeout);
073: }
074: return token;
075: }
076:
077: public Object receiveToken() throws TokenLostException {
078: return receiveToken(0);
079: }
080:
081: public void passToken(Object token) throws TokenLostException {
082: synchronized (socketMutex) {
083: try {
084: ((Externalizable) token).writeExternal(oos);
085: oos.flush();
086: oos.reset();
087: } catch (IOException e) {
088: e.printStackTrace();
089: //something went wrong with the next neighbour while it was receiving
090: //token, assume token lost
091: throw new TokenLostException(e.getMessage(), e,
092: nextNode, TokenLostException.WHILE_SENDING);
093: }
094:
095: }
096: }
097:
098: public void tokenArrived(Object token) {
099: //not needed , callback for udp ring
100: }
101:
102: public void reconfigureAll(Vector newMembers) {
103:
104: }
105:
106: public void reconfigure(Vector newMembers) {
107:
108: if (isNextNeighbourChanged(newMembers)) {
109: IpAddress tokenRecieverAddress = null;
110: synchronized (socketMutex) {
111: nextNode = getNextNode(newMembers);
112:
113: if (log.isInfoEnabled())
114: log.info(" next node " + nextNode);
115:
116: try {
117: tokenRecieverAddress = (IpAddress) rpcProtocol
118: .callRemoteMethod(nextNode,
119: "getTokenReceiverAddress",
120: GroupRequest.GET_FIRST, 0);
121: } catch (TimeoutException tim) {
122: if (log.isErrorEnabled())
123: log
124: .error(" timeouted while doing rpc call getTokenReceiverAddress"
125: + tim);
126: tim.printStackTrace();
127: } catch (SuspectedException sus) {
128: if (log.isErrorEnabled())
129: log
130: .error(" suspected node while doing rpc call getTokenReceiverAddress"
131: + sus);
132: sus.printStackTrace();
133: }
134: try {
135: closeSocket(next);
136: next = new Socket(tokenRecieverAddress
137: .getIpAddress(), tokenRecieverAddress
138: .getPort());
139: next.setTcpNoDelay(true);
140: oos = new ObjectOutputStream(next.getOutputStream());
141: } catch (IOException ioe) {
142: if (log.isErrorEnabled())
143: log.error("could not connect to next node "
144: + ioe);
145: ioe.printStackTrace();
146: }
147: }
148: }
149: }
150:
151: private void closeSocket(Socket socket) {
152: if (socket == null)
153: return;
154: try {
155: socket.close();
156: } catch (IOException ioe) {
157: ioe.printStackTrace();
158: }
159:
160: }
161:
162: private boolean isNextNeighbourChanged(Vector newMembers) {
163: Address oldNeighbour = nextNode;
164: Address newNeighbour = getNextNode(newMembers);
165: return !(newNeighbour.equals(oldNeighbour));
166: }
167:
168: private Address getNextNode(Vector otherNodes) {
169: int myIndex = otherNodes.indexOf(this Node);
170: return (myIndex == otherNodes.size() - 1) ? (Address) otherNodes
171: .firstElement()
172: : (Address) otherNodes.elementAt(myIndex + 1);
173:
174: }
175: }
|