001: package org.jgroups.stack;
002:
003: import org.apache.commons.logging.Log;
004: import org.apache.commons.logging.LogFactory;
005: import org.jgroups.Address;
006: import org.jgroups.util.Util;
007:
008: import java.io.*;
009: import java.net.InetAddress;
010: import java.net.Socket;
011: import java.net.SocketAddress;
012: import java.net.InetSocketAddress;
013: import java.util.*;
014:
015: /**
016: * Local stub for clients to access one (or more) GossipRouters. Will use proprietary protocol
017: * (using GossipData PDUs) based on TCP to connect to GossipRouter.<p>
018: * Requires JDK >= 1.3 due to the use of Timer.
019: *
020: * @author Bela Ban Oct 4 2001
021: * @version $Id: GossipClient.java,v 1.16 2006/10/25 08:23:58 belaban Exp $
022: */
023: public class GossipClient {
024: Timer timer = new Timer(true);
025:
026: /** Hashtable<String,List<Address>> */
027: final Hashtable groups = new Hashtable(); // groups - List of Addresses
028: private Refresher refresher_task = new Refresher();
029: final Vector gossip_servers = new Vector(); // a list of GossipRouters (IpAddress)
030: boolean timer_running = false;
031: boolean refresher_enabled = true;
032: long EXPIRY_TIME = 20000; // must be less than in GossipRouter
033: final int SOCKET_TIMEOUT = 5000; // max number of ms to wait for socket establishment to GossipRouter
034:
035: protected final Log log = LogFactory.getLog(this .getClass());
036:
037: /**
038: * Creates the GossipClient
039: * @param gossip_host The address and port of the host on which the GossipRouter is running
040: * @param expiry Interval (in msecs) for the refresher task
041: */
042: public GossipClient(IpAddress gossip_host, long expiry) {
043: init(gossip_host, expiry);
044: }
045:
046: /**
047: Creates the GossipClient
048: @param gossip_hosts List of IpAddresses
049: @param expiry Interval (in msecs) for the refresher task
050: */
051: public GossipClient(Vector gossip_hosts, long expiry) {
052: if (gossip_hosts == null) {
053: if (log.isErrorEnabled())
054: log.error("empty set of GossipRouters given");
055: return;
056: }
057: for (int i = 0; i < gossip_hosts.size(); i++)
058: init((IpAddress) gossip_hosts.elementAt(i), expiry);
059: }
060:
061: public boolean isRefresherEnabled() {
062: return refresher_enabled;
063: }
064:
065: public void setRefresherEnabled(boolean refresher_enabled) {
066: this .refresher_enabled = refresher_enabled;
067: }
068:
069: public void stop() {
070: timer_running = false;
071: if (refresher_task != null)
072: refresher_task.cancel();
073: timer.cancel();
074: groups.clear();
075: // provide another refresh tools in case the channel gets reconnected
076: // timer=new Timer();
077: // refresher_task=new Refresher();
078:
079: }
080:
081: public void destroy() {
082: timer_running = false;
083: timer.cancel();
084: groups.clear();
085: }
086:
087: /**
088: * Adds a GossipRouter to be accessed.
089: */
090: public void addGossipRouter(IpAddress gossip_host) {
091: if (!gossip_servers.contains(gossip_host))
092: gossip_servers.addElement(gossip_host);
093: }
094:
095: /**
096: Adds the member to the given group. If the group already has an entry for the member,
097: its timestamp will be updated, preventing the cache cleaner from removing the entry.<p>
098: The entry will be registered <em>with all GossipRouters that GossipClient is configured to access</em>
099: */
100: public void register(String group, Address mbr) {
101: if (group == null || mbr == null) {
102: if (log.isErrorEnabled())
103: log.error("group or mbr is null");
104: return;
105: }
106:
107: List mbrs = (List) groups.get(group);
108: if (mbrs == null) {
109: mbrs = new LinkedList();
110: mbrs.add(mbr);
111: groups.put(group, mbrs);
112: } else {
113: if (!mbrs.contains(mbr))
114: mbrs.add(mbr);
115: }
116:
117: _register(group, mbr); // update entry in GossipRouter
118:
119: if (refresher_enabled) {
120: if (!timer_running) {
121: timer = new Timer(true);
122: refresher_task = new Refresher();
123: timer
124: .schedule(refresher_task, EXPIRY_TIME,
125: EXPIRY_TIME);
126: timer_running = true;
127: }
128: }
129: }
130:
131: public void unregister(String group, Address mbr) {
132: if (group == null || mbr == null) {
133: if (log.isErrorEnabled())
134: log.error("group or mbr is null");
135: return;
136: }
137:
138: _unregister(group, mbr); // remove entry from GossipRouter
139: }
140:
141: /**
142: Returns all members of a given group
143: @param group The group name
144: @return List A list of Addresses
145: */
146: public List getMembers(String group) {
147: if (group == null) {
148: if (log.isErrorEnabled())
149: log.error("group is null");
150: return null;
151: }
152: List result = _getMembers(group);
153: if (log.isTraceEnabled())
154: log.trace("GET(" + group + ") --> " + result);
155: return result;
156: }
157:
158: /* ------------------------------------- Private methods ----------------------------------- */
159:
160: final void init(IpAddress gossip_host, long expiry) {
161: EXPIRY_TIME = expiry;
162: addGossipRouter(gossip_host);
163: }
164:
165: /**
166: * Registers the group|mbr with *all* GossipRouters.
167: */
168: void _register(String group, Address mbr) {
169: Socket sock = null;
170: DataOutputStream out = null;
171: IpAddress entry;
172: GossipData gossip_req;
173:
174: for (int i = 0; i < gossip_servers.size(); i++) {
175: entry = (IpAddress) gossip_servers.elementAt(i);
176: if (entry.getIpAddress() == null || entry.getPort() == 0) {
177: if (log.isErrorEnabled())
178: log.error("entry.host or entry.port is null");
179: continue;
180: }
181: try {
182: if (log.isTraceEnabled())
183: log.trace("REGISTER(" + group + ", " + mbr
184: + ") with GossipRouter at "
185: + entry.getIpAddress() + ':'
186: + entry.getPort());
187: sock = new Socket(entry.getIpAddress(), entry.getPort());
188: out = new DataOutputStream(sock.getOutputStream());
189: gossip_req = new GossipData(GossipRouter.REGISTER,
190: group, mbr, null);
191: // must send GossipData as fast as possible, otherwise the
192: // request might be rejected
193: gossip_req.writeTo(out);
194: out.flush();
195: } catch (Exception ex) {
196: if (log.isErrorEnabled())
197: log.error("exception connecting to host " + entry);
198: } finally {
199: Util.close(out);
200: if (sock != null) {
201: try {
202: sock.close();
203: } catch (IOException e) {
204: }
205: }
206: }
207: }
208: }
209:
210: void _unregister(String group, Address mbr) {
211: Socket sock = null;
212: DataOutputStream out = null;
213: IpAddress entry;
214: GossipData gossip_req;
215:
216: for (int i = 0; i < gossip_servers.size(); i++) {
217: entry = (IpAddress) gossip_servers.elementAt(i);
218: if (entry.getIpAddress() == null || entry.getPort() == 0) {
219: if (log.isErrorEnabled())
220: log.error("entry.host or entry.port is null");
221: continue;
222: }
223: try {
224: if (log.isTraceEnabled())
225: log.trace("UNREGISTER(" + group + ", " + mbr
226: + ") with GossipRouter at "
227: + entry.getIpAddress() + ':'
228: + entry.getPort());
229: sock = new Socket(entry.getIpAddress(), entry.getPort());
230: out = new DataOutputStream(sock.getOutputStream());
231: gossip_req = new GossipData(GossipRouter.UNREGISTER,
232: group, mbr, null);
233: // must send GossipData as fast as possible, otherwise the
234: // request might be rejected
235: gossip_req.writeTo(out);
236: out.flush();
237: } catch (Exception ex) {
238: if (log.isErrorEnabled())
239: log.error("exception connecting to host " + entry);
240: } finally {
241: Util.close(out);
242: if (sock != null) {
243: try {
244: sock.close();
245: } catch (IOException e) {
246: }
247: }
248: }
249: }
250: }
251:
252: /**
253: * Sends a GET_MBR_REQ to *all* GossipRouters, merges responses.
254: */
255: private List _getMembers(String group) {
256: List ret = new LinkedList();
257: Socket sock = null;
258: SocketAddress destAddr;
259: DataOutputStream out = null;
260: DataInputStream in = null;
261: IpAddress entry;
262: GossipData gossip_req, gossip_rsp;
263: Address mbr;
264:
265: for (int i = 0; i < gossip_servers.size(); i++) {
266: entry = (IpAddress) gossip_servers.elementAt(i);
267: if (entry.getIpAddress() == null || entry.getPort() == 0) {
268: if (log.isErrorEnabled())
269: log.error("entry.host or entry.port is null");
270: continue;
271: }
272:
273: try {
274: // sock=new Socket(entry.getIpAddress(), entry.getPort());
275: sock = new Socket();
276: destAddr = new InetSocketAddress(entry.getIpAddress(),
277: entry.getPort());
278: sock.connect(destAddr, SOCKET_TIMEOUT);
279: out = new DataOutputStream(sock.getOutputStream());
280:
281: gossip_req = new GossipData(GossipRouter.GOSSIP_GET,
282: group, null, null);
283: // must send GossipData as fast as possible, otherwise the
284: // request might be rejected
285: gossip_req.writeTo(out);
286: out.flush();
287:
288: in = new DataInputStream(sock.getInputStream());
289: gossip_rsp = new GossipData();
290: gossip_rsp.readFrom(in);
291: if (gossip_rsp.mbrs != null) { // merge with ret
292: for (Iterator it = gossip_rsp.mbrs.iterator(); it
293: .hasNext();) {
294: mbr = (Address) it.next();
295: if (!ret.contains(mbr))
296: ret.add(mbr);
297: }
298: }
299: } catch (Exception ex) {
300: if (log.isErrorEnabled())
301: log.error("exception connecting to host " + entry);
302: } finally {
303: Util.close(out);
304: Util.close(in);
305: if (sock != null) {
306: try {
307: sock.close();
308: } catch (IOException e) {
309: }
310: }
311: }
312: }
313:
314: return ret;
315: }
316:
317: /* ---------------------------------- End of Private methods ------------------------------- */
318:
319: /**
320: * Periodically iterates through groups and refreshes all registrations with GossipRouter
321: */
322: private class Refresher extends TimerTask {
323:
324: public void run() {
325: int num_items = 0;
326: String group;
327: List mbrs;
328: Address mbr;
329:
330: if (log.isTraceEnabled())
331: log.trace("refresher task is run");
332: for (Enumeration e = groups.keys(); e.hasMoreElements();) {
333: group = (String) e.nextElement();
334: mbrs = (List) groups.get(group);
335: if (mbrs != null) {
336: for (Iterator it = mbrs.iterator(); it.hasNext();) {
337: mbr = (Address) it.next();
338: if (log.isTraceEnabled())
339: log.trace("registering " + group + " : "
340: + mbr);
341: register(group, mbr);
342: num_items++;
343: }
344: }
345: }
346: if (log.isTraceEnabled())
347: log.trace("refresher task done. Registered "
348: + num_items + " items");
349: }
350:
351: }
352:
353: public static void main(String[] args) {
354: Vector gossip_hosts = new Vector();
355: String host;
356: InetAddress ip_addr;
357: int port;
358: boolean get = false, register = false, keep_running = false;
359: String register_host = null;
360: int register_port = 0;
361: String get_group = null, register_group = null;
362: GossipClient gossip_client = null;
363: List mbrs;
364: long expiry = 20000;
365:
366: for (int i = 0; i < args.length; i++) {
367: if ("-help".equals(args[i])) {
368: usage();
369: return;
370: }
371: if ("-expiry".equals(args[i])) {
372: expiry = Long.parseLong(args[++i]);
373: continue;
374: }
375: if ("-host".equals(args[i])) {
376: host = args[++i];
377: port = Integer.parseInt(args[++i]);
378: try {
379: ip_addr = InetAddress.getByName(host);
380: gossip_hosts
381: .addElement(new IpAddress(ip_addr, port));
382: } catch (Exception ex) {
383: System.err.println(ex);
384: }
385: continue;
386: }
387: if ("-keep_running".equals(args[i])) {
388: keep_running = true;
389: continue;
390: }
391: if ("-get".equals(args[i])) {
392: get = true;
393: get_group = args[++i];
394: continue;
395: }
396: if ("-register".equals(args[i])) {
397: register_group = args[++i];
398: register_host = args[++i];
399: register_port = Integer.parseInt(args[++i]);
400: register = true;
401: continue;
402: }
403: usage();
404: return;
405: }
406:
407: if (gossip_hosts.size() == 0) {
408: System.err
409: .println("At least 1 GossipRouter has to be given");
410: return;
411: }
412:
413: if (!register && !get) {
414: System.err
415: .println("Neither get nor register command given, will not do anything");
416: return;
417: }
418:
419: try {
420: gossip_client = new GossipClient(gossip_hosts, expiry);
421: if (register) {
422: System.out
423: .println("Registering " + register_group
424: + " --> " + register_host + ':'
425: + register_port);
426: gossip_client.register(register_group, new IpAddress(
427: register_host, register_port));
428: }
429:
430: if (get) {
431: System.out.println("Getting members for group "
432: + get_group);
433: mbrs = gossip_client.getMembers(get_group);
434: System.out.println("Members for group " + get_group
435: + " are " + mbrs);
436: }
437: } catch (Exception ex) {
438: System.err.println(ex);
439: }
440: if (!keep_running)
441: gossip_client.stop();
442: }
443:
444: static void usage() {
445: System.out
446: .println("GossipClient [-help] [-host <hostname> <port>]+ "
447: + " [-get <groupname>] [-register <groupname hostname port>] [-expiry <msecs>] "
448: + "[-keep_running]]");
449: }
450:
451: }
|