001: package org.jgroups.stack;
002:
003: import org.apache.commons.logging.Log;
004: import org.apache.commons.logging.LogFactory;
005: import org.jgroups.Address;
006: import org.jgroups.Message;
007: import org.jgroups.protocols.TunnelHeader;
008: import org.jgroups.util.Buffer;
009: import org.jgroups.util.ExposedByteArrayOutputStream;
010: import org.jgroups.util.Util;
011:
012: import java.io.ByteArrayInputStream;
013: import java.io.DataInputStream;
014: import java.io.DataOutputStream;
015: import java.net.Socket;
016: import java.net.InetAddress;
017: import java.net.DatagramSocket;
018: import java.net.SocketException;
019: import java.util.List;
020:
021: /**
022: * Client stub that talks to a remote GossipRouter
023: * @author Bela Ban
024: * @version $Id: RouterStub.java,v 1.22 2006/10/25 08:23:58 belaban Exp $
025: */
026: public class RouterStub {
027: String router_host = null; // name of the router host
028: int router_port = 0; // port on which router listens on router_host
029: Socket sock = null; // socket connecting to the router
030: private ExposedByteArrayOutputStream out_stream = new ExposedByteArrayOutputStream(
031: 512);
032: DataOutputStream output = null; // output stream associated with sock
033: DataInputStream input = null; // input stream associated with sock
034: DatagramSocket my_sock = null; // needed to generate an ID that's unqique across this host
035: Address local_addr = null; // addr of group mbr. Once assigned, remains the same
036: static final long RECONNECT_TIMEOUT = 5000; // msecs to wait until next connection retry attempt
037: private volatile boolean connected = false;
038: private volatile boolean reconnect = false; // controls reconnect() loop
039: protected static final Log log = LogFactory
040: .getLog(RouterStub.class);
041: protected ConnectionListener conn_listener;
042: private String groupname = null;
043: private InetAddress bind_addr = null;
044:
045: public interface ConnectionListener {
046: void connectionStatusChange(boolean connected);
047: }
048:
049: public RouterStub() {
050: }
051:
052: /**
053: Creates a stub for a remote Router object.
054: @param router_host The name of the router's host
055: @param router_port The router's port
056: */
057: public RouterStub(String router_host, int router_port) {
058: this .router_host = router_host != null ? router_host
059: : "localhost";
060: this .router_port = router_port;
061: }
062:
063: public InetAddress getBindAddress() {
064: return bind_addr;
065: }
066:
067: public void setBindAddress(InetAddress bind_addr) {
068: this .bind_addr = bind_addr;
069: }
070:
071: public String getRouterHost() {
072: return router_host;
073: }
074:
075: public void setRouterHost(String router_host) {
076: this .router_host = router_host;
077: }
078:
079: public int getRouterPort() {
080: return router_port;
081: }
082:
083: public void setRouterPort(int router_port) {
084: this .router_port = router_port;
085: }
086:
087: public boolean isConnected() {
088: return connected;
089: }
090:
091: public void setConnectionListener(ConnectionListener conn_listener) {
092: this .conn_listener = conn_listener;
093: }
094:
095: public synchronized Address getLocalAddress()
096: throws SocketException {
097: if (local_addr == null)
098: local_addr = generateLocalAddress();
099: return local_addr;
100: }
101:
102: private synchronized Address generateLocalAddress()
103: throws SocketException {
104: my_sock = new DatagramSocket(0, bind_addr);
105: local_addr = new IpAddress(bind_addr, my_sock.getLocalPort());
106: return local_addr;
107: }
108:
109: // private synchronized long generateUniquePort() {
110: // long ret=System.currentTimeMillis();
111: // if(ret <= last_port) {
112: // ret=++last_port;
113: // }
114: // else {
115: // last_port=ret;
116: // }
117: // return ret;
118: // }
119:
120: /**
121: Register this process with the router under <code>groupname</code>.
122: @param groupname The name of the group under which to register
123: */
124: public synchronized void connect(String groupname) throws Exception {
125: if (groupname == null || groupname.length() == 0)
126: throw new Exception("groupname is null");
127:
128: this .groupname = groupname;
129:
130: if (local_addr == null)
131: local_addr = generateLocalAddress();
132:
133: try {
134: sock = new Socket(router_host, router_port, bind_addr, 0);
135: sock.setSoLinger(true, 500);
136: output = new DataOutputStream(sock.getOutputStream());
137: GossipData req = new GossipData(GossipRouter.CONNECT,
138: groupname, local_addr, null);
139: req.writeTo(output);
140: output.flush();
141: input = new DataInputStream(sock.getInputStream()); // retrieve our own address by reading it from the socket
142: setConnected(true);
143: } catch (Exception e) {
144: if (log.isWarnEnabled())
145: log.warn("failed connecting to " + router_host + ":"
146: + router_port);
147: Util.close(sock);
148: Util.close(input);
149: Util.close(output);
150: setConnected(false);
151: throw e;
152: }
153: }
154:
155: public void connect(String groupname, String router_host,
156: int router_port) throws Exception {
157: setRouterHost(router_host);
158: setRouterPort(router_port);
159: connect(groupname);
160: }
161:
162: /** Closes the socket and the input and output streams associated with it */
163: public synchronized void disconnect() {
164: disconnect(false);
165: }
166:
167: public synchronized void disconnect(boolean is_reconnect) {
168: try {
169: if (sock == null || output == null || input == null) {
170: setConnected(false);
171: return;
172: }
173:
174: if (groupname == null || groupname.length() == 0) {
175: if (log.isErrorEnabled())
176: log.error("groupname is null");
177: return;
178: }
179:
180: if (local_addr == null) {
181: if (log.isErrorEnabled())
182: log.error("local_addr is null");
183: return;
184: }
185:
186: GossipData req = new GossipData(GossipRouter.DISCONNECT,
187: groupname, local_addr, null);
188: req.writeTo(output);
189: setConnected(false);
190: } catch (Exception e) {
191: // if(log.isErrorEnabled()) log.error("failed unregistering " + local_addr, e);
192: } finally {
193: Util.close(output);
194: Util.close(input);
195: Util.close(sock);
196: sock = null;
197: setConnected(false);
198: // stop the TUNNEL receiver thread
199: reconnect = false;
200: if (is_reconnect) {
201: Util.close(my_sock);
202: local_addr = null;
203: }
204: }
205: }
206:
207: /**
208: Retrieves the membership (list of Addresses) for a given group. This is mainly used by the PING
209: protocol to obtain its initial membership. This is used infrequently, so don't maintain socket
210: for the entire time, but create/delete it on demand.
211: */
212: public List get(String groupname) {
213: List ret = null;
214: Socket tmpsock = null;
215: DataOutputStream tmpOutput = null;
216: DataInputStream tmpInput = null;
217:
218: if (groupname == null || groupname.length() == 0) {
219: if (log.isErrorEnabled())
220: log.error("groupname is null");
221: return null;
222: }
223:
224: try {
225: tmpsock = new Socket(router_host, router_port);
226: tmpsock.setSoLinger(true, 500);
227:
228: // request membership for groupname
229: tmpOutput = new DataOutputStream(tmpsock.getOutputStream());
230: GossipData request = new GossipData(
231: GossipRouter.ROUTER_GET, groupname, null, null);
232: request.writeTo(tmpOutput);
233:
234: tmpInput = new DataInputStream(tmpsock.getInputStream());
235: GossipData response = new GossipData();
236: response.readFrom(tmpInput);
237: return response.getMembers();
238: } catch (Exception e) {
239: if (log.isErrorEnabled())
240: log.error("exception=" + e);
241: } finally {
242: Util.close(tmpOutput);
243: Util.close(tmpInput);
244: Util.close(tmpsock);
245: }
246: return ret;
247: }
248:
249: /** Sends a message to the router. Returns false if message cannot be sent (e.g. no connection to
250: router, true otherwise. */
251: public boolean send(Message msg, String groupname) {
252: Address dst_addr = null;
253:
254: if (sock == null || output == null || input == null) {
255: if (log.isErrorEnabled())
256: log.error("no connection to router (groupname="
257: + groupname + ')');
258: setConnected(false);
259: return false;
260: }
261:
262: if (msg == null) {
263: if (log.isErrorEnabled())
264: log.error("message is null");
265: return false;
266: }
267:
268: try {
269: dst_addr = msg.getDest(); // could be null in case of mcast
270: try {
271: out_stream.reset();
272: } catch (Exception ex) {
273: out_stream = new ExposedByteArrayOutputStream(512);
274: }
275: // at this point out_stream is always valid and non-null
276:
277: DataOutputStream tmp = new DataOutputStream(out_stream);
278: msg.writeTo(tmp);
279: tmp.close();
280: Buffer buf = new Buffer(out_stream.getRawBuffer(), 0,
281: out_stream.size());
282:
283: // 1. Group name
284: output.writeUTF(groupname);
285:
286: // 2. Destination address
287: Util.writeAddress(dst_addr, output);
288:
289: // 3. Length of byte buffer
290: output.writeInt(buf.getLength());
291:
292: // 4. Byte buffer
293: output.write(buf.getBuf(), 0, buf.getLength());
294: } catch (Exception e) {
295: if (log.isErrorEnabled())
296: log.error("failed sending message to " + dst_addr, e);
297: setConnected(false);
298: return false;
299: }
300: return true;
301: }
302:
303: /** Receives a message from the router (blocking mode). If the connection is down,
304: false is returned, otherwise true */
305: public Message receive() throws Exception {
306: Message ret = null;
307: byte[] buf = null;
308: int len;
309:
310: if (sock == null || output == null || input == null) {
311: // if(log.isErrorEnabled()) log.error("no connection to router");
312: setConnected(false);
313: return null;
314: }
315: Address dest;
316: try {
317: dest = Util.readAddress(input);
318: len = input.readInt();
319: if (len == 0) {
320: ret = null;
321: } else {
322: buf = new byte[len];
323: input.readFully(buf, 0, len);
324: ret = new Message(false);
325: ByteArrayInputStream tmp = new ByteArrayInputStream(buf);
326: DataInputStream in = new DataInputStream(tmp);
327: ret.readFrom(in);
328: ret.setDest(dest);
329: in.close();
330: }
331: if (log.isTraceEnabled())
332: log.trace("received " + ret);
333: return ret;
334: } catch (Exception e) {
335: setConnected(false);
336: throw e;
337: }
338: }
339:
340: /** Tries to establish connection to router. Tries until router is up again. */
341: public void reconnect(int max_attempts) throws Exception {
342: int num_atttempts = 0;
343:
344: if (connected)
345: return;
346: disconnect();
347: reconnect = true;
348: while (reconnect
349: && (num_atttempts++ < max_attempts || max_attempts == -1)) {
350: try {
351: connect(groupname);
352: break;
353: } catch (Exception ex) { // this is a normal case
354: if (log.isTraceEnabled())
355: log.trace("failed reconnecting", ex);
356: }
357: if (max_attempts == -1)
358: Util.sleep(RECONNECT_TIMEOUT);
359: }
360: if (!connected)
361: throw new Exception("reconnect failed");
362: if (log.isTraceEnabled())
363: log.trace("client reconnected");
364: }
365:
366: public void reconnect() throws Exception {
367: reconnect(-1);
368: }
369:
370: private void notifyConnectionListener(boolean connected) {
371: if (conn_listener != null) {
372: conn_listener.connectionStatusChange(connected);
373: }
374: }
375:
376: private void setConnected(boolean connected) {
377: boolean notify = this .connected != connected;
378: this .connected = connected;
379: if (notify) {
380: try {
381: notifyConnectionListener(this .connected);
382: } catch (Throwable t) {
383: log.error("failed notifying ConnectionListener "
384: + conn_listener, t);
385: }
386: }
387: }
388:
389: public static void main(String[] args) {
390: if (args.length != 2) {
391: System.out.println("RouterStub <host> <port>");
392: return;
393: }
394: RouterStub stub = new RouterStub(args[0], Integer
395: .parseInt(args[1])), stub2 = new RouterStub(args[0],
396: Integer.parseInt(args[1]));
397: Address my_addr;
398: boolean rc;
399: final String groupname = "BelaGroup";
400: Message msg;
401: List mbrs;
402:
403: try {
404: System.out.println("Registering under " + groupname);
405: stub.connect(groupname);
406: System.out.println("My address is "
407: + stub.getLocalAddress());
408: my_addr = stub2.getLocalAddress();
409: stub2.connect(groupname);
410: System.out
411: .println("Getting members of " + groupname + ": ");
412: mbrs = stub.get(groupname);
413: System.out.println("Done, mbrs are " + mbrs);
414:
415: for (int i = 1; i <= 10; i++) {
416: msg = new Message(null, my_addr, "Bela #" + i);
417: msg.putHeader("TUNNEL", new TunnelHeader(groupname));
418: rc = stub2.send(msg, groupname);
419: System.out.println("Sent msg #" + i + ", rc=" + rc);
420: }
421:
422: for (int i = 0; i < 10; i++) {
423: msg = stub.receive();
424: System.out.println("Received msg " + msg.getObject());
425: }
426: } catch (Exception ex) {
427: log.error(ex);
428: } finally {
429: stub.disconnect();
430: }
431: }
432:
433: }
|