001: package org.jgroups.tests.perf.transports;
002:
003: import org.jgroups.stack.IpAddress;
004: import org.jgroups.tests.perf.Receiver;
005: import org.jgroups.tests.perf.Transport;
006: import org.jgroups.util.Util;
007: import org.jgroups.Address;
008:
009: import java.io.*;
010: import java.net.*;
011: import java.util.*;
012:
013: /**
014: * @author Bela Ban Jan 22
015: * @author 2004
016: * @version $Id: TcpTransport.java,v 1.15 2005/09/08 14:27:35 belaban Exp $
017: */
018: public class TcpTransport implements Transport {
019: Receiver receiver = null;
020: Properties config = null;
021: int max_receiver_buffer_size = 500000;
022: int max_send_buffer_size = 500000;
023: List nodes;
024: ConnectionTable ct;
025: int start_port = 7800;
026: ServerSocket srv_sock = null;
027: InetAddress bind_addr = null;
028: IpAddress local_addr = null;
029: List receivers = new ArrayList();
030:
031: public TcpTransport() {
032: }
033:
034: public Object getLocalAddress() {
035: return local_addr;
036: }
037:
038: public void create(Properties properties) throws Exception {
039: this .config = properties;
040: String tmp;
041: if ((tmp = config.getProperty("srv_port")) != null)
042: start_port = Integer.parseInt(tmp);
043: else if ((tmp = config.getProperty("start_port")) != null)
044: start_port = Integer.parseInt(tmp);
045:
046: String bind_addr_str = System.getProperty("udp.bind_addr",
047: config.getProperty("bind_addr"));
048: if (bind_addr_str != null) {
049: bind_addr = InetAddress.getByName(bind_addr_str);
050: } else
051: bind_addr = InetAddress.getLocalHost();
052:
053: String cluster_def = config.getProperty("cluster");
054: if (cluster_def == null)
055: throw new Exception(
056: "TcpTransport.create(): property 'cluster' is not defined");
057: nodes = parseCommaDelimitedList(cluster_def);
058: ct = new ConnectionTable(nodes);
059: }
060:
061: public void start() throws Exception {
062: srv_sock = Util.createServerSocket(bind_addr, start_port);
063: local_addr = new IpAddress(srv_sock.getInetAddress(), srv_sock
064: .getLocalPort());
065: ct.init();
066:
067: // accept connections and start 1 Receiver per connection
068: Thread acceptor = new Thread() {
069: public void run() {
070: while (true) {
071: try {
072: Socket s = srv_sock.accept();
073: ReceiverThread r = new ReceiverThread(s);
074: r.setDaemon(true);
075: receivers.add(r);
076: r.start();
077: } catch (Exception ex) {
078: ex.printStackTrace();
079: break;
080: }
081: }
082: }
083: };
084: acceptor.setDaemon(true);
085: acceptor.start();
086: }
087:
088: public void stop() {
089: ct.close();
090: for (Iterator it = receivers.iterator(); it.hasNext();) {
091: ReceiverThread thread = (ReceiverThread) it.next();
092: thread.stopThread();
093: }
094: }
095:
096: public void destroy() {
097: ;
098: }
099:
100: public void setReceiver(Receiver r) {
101: this .receiver = r;
102: }
103:
104: public Map dumpStats() {
105: return null;
106: }
107:
108: public void send(Object destination, byte[] payload)
109: throws Exception {
110: if (destination != null)
111: throw new Exception(
112: "TcpTransport.send(): unicasts not supported");
113: ct.writeMessage(payload);
114: }
115:
116: class ConnectionTable {
117: /** List<InetSocketAddress> */
118: List myNodes;
119: final Connection[] connections;
120:
121: ConnectionTable(List nodes) throws Exception {
122: this .myNodes = nodes;
123: connections = new Connection[nodes.size()];
124: }
125:
126: void init() throws Exception {
127: int i = 0;
128:
129: for (Iterator it = myNodes.iterator(); it.hasNext();) {
130: InetSocketAddress addr = (InetSocketAddress) it.next();
131: if (connections[i] == null) {
132: try {
133: connections[i] = new Connection(addr);
134: connections[i].createSocket();
135: System.out.println("-- connected to " + addr);
136: } catch (ConnectException connect_ex) {
137: System.out.println("-- failed to connect to "
138: + addr);
139: } catch (Exception all_others) {
140: throw all_others;
141: }
142: }
143: i++;
144: }
145: }
146:
147: // todo: parallelize
148: void writeMessage(byte[] msg) throws Exception {
149: for (int i = 0; i < connections.length; i++) {
150: Connection c = connections[i];
151: if (c != null) {
152: try {
153: c.writeMessage(msg);
154: } catch (Exception e) {
155: // System.err.println("failed sending msg on " + c);
156: }
157: }
158: }
159: }
160:
161: void close() {
162: for (int i = 0; i < connections.length; i++) {
163: Connection c = connections[i];
164: if (c != null)
165: c.close();
166: }
167: }
168:
169: public String toString() {
170: StringBuffer sb = new StringBuffer();
171: for (Iterator it = myNodes.iterator(); it.hasNext();) {
172: sb.append(it.next()).append(' ');
173: }
174: return sb.toString();
175: }
176: }
177:
178: class Connection {
179: Socket sock = null;
180: DataOutputStream out;
181: InetSocketAddress to;
182: final Object mutex = new Object();
183:
184: Connection(InetSocketAddress addr) {
185: this .to = addr;
186: }
187:
188: void createSocket() throws IOException {
189: sock = new Socket(to.getAddress(), to.getPort());
190: sock.setSendBufferSize(max_send_buffer_size);
191: sock.setReceiveBufferSize(max_receiver_buffer_size);
192: out = new DataOutputStream(new BufferedOutputStream(sock
193: .getOutputStream()));
194: Util.writeAddress(local_addr, out);
195: }
196:
197: void writeMessage(byte[] msg) throws Exception {
198: synchronized (mutex) {
199: if (sock == null) {
200: createSocket();
201: }
202: out.writeInt(msg.length);
203: out.write(msg, 0, msg.length);
204: }
205: out.flush();
206: }
207:
208: void close() {
209: try {
210: out.flush();
211: sock.close();
212: } catch (Exception ex) {
213: }
214: }
215:
216: public String toString() {
217: return "Connection from " + local_addr + " to " + to;
218: }
219: }
220:
221: class ReceiverThread extends Thread {
222: Socket sock;
223: DataInputStream in;
224: Address peer_addr;
225:
226: ReceiverThread(Socket sock) throws Exception {
227: this .sock = sock;
228: // sock.setSoTimeout(5000);
229: in = new DataInputStream(new BufferedInputStream(sock
230: .getInputStream()));
231: // in=new DataInputStream(sock.getInputStream());
232:
233: peer_addr = Util.readAddress(in);
234: // System.out.println("-- ACCEPTED connection from " + peer_addr);
235: }
236:
237: public void run() {
238: while (sock != null) {
239: try {
240: int len = in.readInt();
241: byte[] buf = new byte[len];
242: in.readFully(buf, 0, len);
243: // System.out.println("-- received data from " + peer_addr);
244: if (receiver != null)
245: receiver.receive(peer_addr, buf);
246: } catch (EOFException eof) {
247: break;
248: } catch (Exception ex) {
249: break;
250: }
251: }
252: System.out.println("-- receiver thread for " + peer_addr
253: + " terminated");
254: }
255:
256: void stopThread() {
257: try {
258: sock.close();
259: sock = null;
260: } catch (Exception ex) {
261: }
262: }
263: }
264:
265: public List parseCommaDelimitedList(String s) throws Exception {
266: List retval = new ArrayList();
267: StringTokenizer tok;
268: String hostname, tmp;
269: int port;
270: InetSocketAddress addr;
271: int index;
272:
273: if (s == null)
274: return null;
275: tok = new StringTokenizer(s, ",");
276: while (tok.hasMoreTokens()) {
277: tmp = tok.nextToken();
278: index = tmp.indexOf(':');
279: if (index == -1)
280: throw new Exception(
281: "host must be in format <host:port>, was "
282: + tmp);
283: hostname = tmp.substring(0, index);
284: port = Integer.parseInt(tmp.substring(index + 1));
285: addr = new InetSocketAddress(hostname, port);
286: retval.add(addr);
287: }
288: return retval;
289: }
290:
291: }
|