001: package org.jgroups.protocols;
002:
003: import org.jgroups.Address;
004: import org.jgroups.Event;
005: import org.jgroups.Message;
006: import org.jgroups.View;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.BoundedList;
009:
010: import java.net.InetAddress;
011: import java.net.UnknownHostException;
012: import java.util.Collection;
013: import java.util.Properties;
014: import java.util.Vector;
015:
016: /**
017: * Shared base class for tcpip protocols
018: * @author Scott Marlow
019: */
020: public abstract class BasicTCP extends TP {
021:
022: /** Should we drop unicast messages to suspected members or not */
023: boolean skip_suspected_members = true;
024:
025: /** When we cannot send a message to P (on an exception), then we send a SUSPECT message up the stack */
026: boolean suspect_on_send_failure = false;
027:
028: /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT
029: * events up the stack (one per message !)
030: */
031: final BoundedList suspected_mbrs = new BoundedList(20);
032: protected InetAddress external_addr = null; // the IP address which is broadcast to other group members
033: protected int start_port = 7800; // find first available port starting at this port
034: protected int end_port = 0; // maximum port to bind to
035: protected long reaper_interval = 0; // time in msecs between connection reaps
036: protected long conn_expire_time = 0; // max time a conn can be idle before being reaped
037: /** Use separate send queues for each connection */
038: boolean use_send_queues = true;
039: int recv_buf_size = 150000;
040: int send_buf_size = 150000;
041: int sock_conn_timeout = 2000; // max time in millis for a socket creation in ConnectionTable
042: boolean tcp_nodelay = false;
043: int linger = -1; // SO_LINGER (number of ms, -1 disables it)
044:
045: public int getStartPort() {
046: return start_port;
047: }
048:
049: public void setStartPort(int start_port) {
050: this .start_port = start_port;
051: }
052:
053: public int getEndPort() {
054: return end_port;
055: }
056:
057: public void setEndPort(int end_port) {
058: this .end_port = end_port;
059: }
060:
061: public long getReaperInterval() {
062: return reaper_interval;
063: }
064:
065: public void setReaperInterval(long reaper_interval) {
066: this .reaper_interval = reaper_interval;
067: }
068:
069: public long getConnExpireTime() {
070: return conn_expire_time;
071: }
072:
073: public void setConnExpireTime(long conn_expire_time) {
074: this .conn_expire_time = conn_expire_time;
075: }
076:
077: public boolean setProperties(Properties props) {
078: String str;
079:
080: super .setProperties(props);
081:
082: str = props.getProperty("start_port");
083: if (str != null) {
084: start_port = Integer.parseInt(str);
085: props.remove("start_port");
086: }
087:
088: str = props.getProperty("end_port");
089: if (str != null) {
090: end_port = Integer.parseInt(str);
091: props.remove("end_port");
092: }
093:
094: str = props.getProperty("external_addr");
095: if (str != null) {
096: try {
097: external_addr = InetAddress.getByName(str);
098: } catch (UnknownHostException unknown) {
099: if (log.isFatalEnabled())
100: log.fatal("(external_addr): host " + str
101: + " not known");
102: return false;
103: }
104: props.remove("external_addr");
105: }
106:
107: str = props.getProperty("reaper_interval");
108: if (str != null) {
109: reaper_interval = Long.parseLong(str);
110: props.remove("reaper_interval");
111: }
112:
113: str = props.getProperty("conn_expire_time");
114: if (str != null) {
115: conn_expire_time = Long.parseLong(str);
116: props.remove("conn_expire_time");
117: }
118:
119: str = props.getProperty("sock_conn_timeout");
120: if (str != null) {
121: sock_conn_timeout = Integer.parseInt(str);
122: props.remove("sock_conn_timeout");
123: }
124:
125: str = props.getProperty("recv_buf_size");
126: if (str != null) {
127: recv_buf_size = Integer.parseInt(str);
128: props.remove("recv_buf_size");
129: }
130:
131: str = props.getProperty("send_buf_size");
132: if (str != null) {
133: send_buf_size = Integer.parseInt(str);
134: props.remove("send_buf_size");
135: }
136:
137: str = props.getProperty("skip_suspected_members");
138: if (str != null) {
139: skip_suspected_members = Boolean.valueOf(str)
140: .booleanValue();
141: props.remove("skip_suspected_members");
142: }
143:
144: str = props.getProperty("suspect_on_send_failure");
145: if (str != null) {
146: suspect_on_send_failure = Boolean.valueOf(str)
147: .booleanValue();
148: props.remove("suspect_on_send_failure");
149: }
150:
151: str = props.getProperty("use_send_queues");
152: if (str != null) {
153: use_send_queues = Boolean.valueOf(str).booleanValue();
154: props.remove("use_send_queues");
155: }
156:
157: str = props.getProperty("tcp_nodelay");
158: if (str != null) {
159: tcp_nodelay = new Boolean(str).booleanValue();
160: props.remove("tcp_nodelay");
161: }
162:
163: str = props.getProperty("linger");
164: if (str != null) {
165: linger = Integer.parseInt(str);
166: props.remove("linger");
167: }
168:
169: return true;
170: }
171:
172: public void init() throws Exception {
173: super .init();
174: if (start_port <= 0) {
175: Protocol dynamic_discovery_prot = stack
176: .findProtocol("MPING");
177: if (dynamic_discovery_prot == null)
178: dynamic_discovery_prot = stack
179: .findProtocol("TCPGOSSIP");
180:
181: if (dynamic_discovery_prot != null) {
182: if (log.isInfoEnabled())
183: log.info("dynamic discovery is present ("
184: + dynamic_discovery_prot
185: + "), so start_port=" + start_port
186: + " is okay");
187: } else {
188: throw new IllegalArgumentException(
189: "start_port cannot be set to "
190: + start_port
191: + ", as no dynamic discovery protocol (e.g. MPING or TCPGOSSIP) has been detected.");
192: }
193: }
194: }
195:
196: public void sendToAllMembers(byte[] data, int offset, int length)
197: throws Exception {
198: Address dest;
199: Vector mbrs = (Vector) members.clone();
200: for (int i = 0; i < mbrs.size(); i++) {
201: dest = (Address) mbrs.elementAt(i);
202: sendToSingleMember(dest, data, offset, length);
203: }
204: }
205:
206: public void sendToSingleMember(Address dest, byte[] data,
207: int offset, int length) throws Exception {
208: if (log.isTraceEnabled())
209: log.trace("dest=" + dest + " (" + length + " bytes)");
210: if (skip_suspected_members) {
211: if (suspected_mbrs.contains(dest)) {
212: if (log.isTraceEnabled())
213: log.trace("will not send unicast message to "
214: + dest + " as it is currently suspected");
215: return;
216: }
217: }
218:
219: try {
220: send(dest, data, offset, length);
221: } catch (Exception e) {
222: if (log.isTraceEnabled())
223: log.trace("failure sending message to " + dest, e);
224: if (suspect_on_send_failure && members.contains(dest)) {
225: if (!suspected_mbrs.contains(dest)) {
226: suspected_mbrs.add(dest);
227: passUp(new Event(Event.SUSPECT, dest));
228: }
229: }
230: }
231: }
232:
233: public String getInfo() {
234: StringBuffer sb = new StringBuffer();
235: sb.append("connections: ").append(printConnections()).append(
236: "\n");
237: return sb.toString();
238: }
239:
240: public void postUnmarshalling(Message msg, Address dest,
241: Address src, boolean multicast) {
242: if (multicast)
243: msg.setDest(null);
244: else
245: msg.setDest(dest);
246: }
247:
248: public void postUnmarshallingList(Message msg, Address dest,
249: boolean multicast) {
250: postUnmarshalling(msg, dest, null, multicast);
251: }
252:
253: public abstract String printConnections();
254:
255: public abstract void send(Address dest, byte[] data, int offset,
256: int length) throws Exception;
257:
258: public abstract void retainAll(Collection members);
259:
260: /** ConnectionTable.Receiver interface */
261: public void receive(Address sender, byte[] data, int offset,
262: int length) {
263: receive(local_addr, sender, data, offset, length);
264: }
265:
266: protected void handleDownEvent(Event evt) {
267: super .handleDownEvent(evt);
268: if (evt.getType() == Event.VIEW_CHANGE) {
269: suspected_mbrs.removeAll();
270: View v = (View) evt.getArg();
271: Vector tmp_mbrs = v != null ? v.getMembers() : null;
272: if (tmp_mbrs != null) {
273: retainAll(tmp_mbrs); // remove all connections from the ConnectionTable which are not members
274: }
275: } else if (evt.getType() == Event.UNSUSPECT) {
276: suspected_mbrs.removeElement(evt.getArg());
277: }
278: }
279: }
|