001: // $Id: GossipRouter.java,v 1.22.2.1 2007/04/18 05:50:10 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.Address;
009: import org.jgroups.conf.ClassConfigurator;
010: import org.jgroups.util.Util;
011:
012: import java.io.DataInputStream;
013: import java.io.DataOutputStream;
014: import java.io.IOException;
015: import java.net.InetAddress;
016: import java.net.ServerSocket;
017: import java.net.Socket;
018: import java.util.*;
019:
020: /**
021: * Router for TCP based group comunication (using layer TCP instead of UDP).
022: * Instead of the TCP layer sending packets point-to-point to each other
023: * member, it sends the packet to the router which - depending on the target
024: * address - multicasts or unicasts it to the group / or single member.<p>
025: * This class is especially interesting for applets which cannot directly make
026: * connections (neither UDP nor TCP) to a host different from the one they were
027: * loaded from. Therefore, an applet would create a normal channel plus
028: * protocol stack, but the bottom layer would have to be the TCP layer which
029: * sends all packets point-to-point (over a TCP connection) to the router,
030: * which in turn forwards them to their end location(s) (also over TCP). A
031: * centralized router would therefore have to be running on the host the applet
032: * was loaded from.<p>
033: * An alternative for running JGroups in an applet (IP multicast is not allows
034: * in applets as of 1.2), is to use point-to-point UDP communication via the
035: * gossip server. However, then the appplet has to be signed which involves
036: * additional administrative effort on the part of the user.<p>
037: * @author Bela Ban
038: * @author Ovidiu Feodorov <ovidiuf@users.sourceforge.net>
039: * @since 2.1.1
040: */
041: public class GossipRouter {
042: public static final byte CONNECT = 1; // CONNECT(group, addr) --> local address
043: public static final byte DISCONNECT = 2; // DISCONNECT(group, addr)
044: public static final byte REGISTER = 3; // REGISTER(group, addr)
045: public static final byte GOSSIP_GET = 4; // GET(group) --> List<addr> (members)
046: public static final byte ROUTER_GET = 5; // GET(group) --> List<addr> (members)
047: public static final byte GET_RSP = 6; // GET_RSP(List<addr>)
048: public static final byte UNREGISTER = 7; // UNREGISTER(group, addr)
049: public static final byte DUMP = 8; // DUMP
050: public static final byte SHUTDOWN = 9;
051:
052: public static final int PORT = 8980;
053: public static final long EXPIRY_TIME = 30000;
054: public static final long GOSSIP_REQUEST_TIMEOUT = 1000;
055: public static final long ROUTING_CLIENT_REPLY_TIMEOUT = 120000;
056:
057: private int port;
058: private String bindAddressString;
059:
060: // time (in msecs) until a cached 'gossip' member entry expires
061: private long expiryTime;
062:
063: // number of millisecs the main thread waits to receive a gossip request
064: // after connection was established; upon expiration, the router initiates
065: // the routing protocol on the connection. Don't set the interval too big,
066: // otherwise the router will appear slow in answering routing requests.
067: private long gossipRequestTimeout;
068:
069: // time (in ms) main thread waits for a router client to send the routing
070: // request type and the group afiliation before it declares the request
071: // failed.
072: private long routingClientReplyTimeout;
073:
074: // HashMap<String, Map<Address,AddressEntry> >. Maintains associations between groups and their members. Keys=group
075: // names, values = maps of logical address / AddressEntry associations
076: private final Map routingTable = new ConcurrentHashMap();
077:
078: private ServerSocket srvSock = null;
079: private InetAddress bindAddress = null;
080:
081: private boolean up = true;
082:
083: /** whether to discard message sent to self */
084: private boolean discard_loopbacks = false;
085:
086: // the cache sweeper
087: Timer timer = null;
088:
089: protected final Log log = LogFactory.getLog(this .getClass());
090:
091: //
092: // JMX INSTRUMENTATION - MANAGEMENT INTERFACE
093: //
094:
095: public GossipRouter() {
096: this (PORT);
097: }
098:
099: public GossipRouter(int port) {
100: this (port, null);
101: }
102:
103: public GossipRouter(int port, String bindAddressString) {
104: this (port, bindAddressString, EXPIRY_TIME);
105: }
106:
107: public GossipRouter(int port, String bindAddressString,
108: long expiryTime) {
109: this (port, bindAddressString, expiryTime,
110: GOSSIP_REQUEST_TIMEOUT, ROUTING_CLIENT_REPLY_TIMEOUT);
111: }
112:
113: public GossipRouter(int port, String bindAddressString,
114: long expiryTime, long gossipRequestTimeout,
115: long routingClientReplyTimeout) {
116: this .port = port;
117: this .bindAddressString = bindAddressString;
118: this .expiryTime = expiryTime;
119: this .gossipRequestTimeout = gossipRequestTimeout;
120: this .routingClientReplyTimeout = routingClientReplyTimeout;
121: }
122:
123: //
124: // MANAGED ATTRIBUTES
125: //
126:
127: public void setPort(int port) {
128: this .port = port;
129: }
130:
131: public int getPort() {
132: return port;
133: }
134:
135: public void setBindAddress(String bindAddress) {
136: bindAddressString = bindAddress;
137: }
138:
139: public String getBindAddress() {
140: return bindAddressString;
141: }
142:
143: public void setExpiryTime(long expiryTime) {
144: this .expiryTime = expiryTime;
145: }
146:
147: public long getExpiryTime() {
148: return expiryTime;
149: }
150:
151: public void setGossipRequestTimeout(long gossipRequestTimeout) {
152: this .gossipRequestTimeout = gossipRequestTimeout;
153: }
154:
155: public long getGossipRequestTimeout() {
156: return gossipRequestTimeout;
157: }
158:
159: public void setRoutingClientReplyTimeout(
160: long routingClientReplyTimeout) {
161: this .routingClientReplyTimeout = routingClientReplyTimeout;
162: }
163:
164: public long getRoutingClientReplyTimeout() {
165: return routingClientReplyTimeout;
166: }
167:
168: public boolean isStarted() {
169: return srvSock != null;
170: }
171:
172: public boolean isDiscardLoopbacks() {
173: return discard_loopbacks;
174: }
175:
176: public void setDiscardLoopbacks(boolean discard_loopbacks) {
177: this .discard_loopbacks = discard_loopbacks;
178: }
179:
180: public static String type2String(int type) {
181: switch (type) {
182: case CONNECT:
183: return "CONNECT";
184: case DISCONNECT:
185: return "DISCONNECT";
186: case REGISTER:
187: return "REGISTER";
188: case GOSSIP_GET:
189: return "GOSSIP_GET";
190: case ROUTER_GET:
191: return "ROUTER_GET";
192: case GET_RSP:
193: return "GET_RSP";
194: case UNREGISTER:
195: return "UNREGISTER";
196: case DUMP:
197: return "DUMP";
198: case SHUTDOWN:
199: return "SHUTDOWN";
200: default:
201: return "unknown";
202: }
203: }
204:
205: //
206: // JBoss MBean LIFECYCLE OPERATIONS
207: //
208:
209: /**
210: * JBoss MBean lifecycle operation.
211: */
212: public void create() throws Exception {
213: // not used
214: }
215:
216: /**
217: * JBoss MBean lifecycle operation. Called after create(). When this method
218: * is called, the managed attributes have already been set.<br>
219: * Brings the Router in fully functional state.
220: */
221: public void start() throws Exception {
222: if (srvSock != null) {
223: throw new Exception("Router already started.");
224: }
225:
226: if (bindAddressString != null) {
227: bindAddress = InetAddress.getByName(bindAddressString);
228: srvSock = new ServerSocket(port, 50, bindAddress);
229: } else {
230: srvSock = new ServerSocket(port, 50);
231: }
232:
233: up = true;
234:
235: // start the main server thread
236: new Thread(new Runnable() {
237: public void run() {
238: mainLoop();
239: cleanup();
240: }
241: }, "GossipRouter").start();
242:
243: // starts the cache sweeper as daemon thread, so we won't block on it
244: // upon termination
245: timer = new Timer(true);
246: timer.schedule(new TimerTask() {
247: public void run() {
248: sweep();
249: }
250: }, expiryTime, expiryTime);
251: }
252:
253: /**
254: * JBoss MBean lifecycle operation. The JMX agent allways calls this method
255: * before destroy(). Close connections and frees resources.
256: */
257: public void stop() {
258: up = false;
259:
260: if (srvSock == null) {
261: if (log.isWarnEnabled())
262: log.warn("router already stopped");
263: return;
264: }
265:
266: timer.cancel();
267: shutdown();
268: try {
269: srvSock.close();
270: } catch (Exception e) {
271: if (log.isErrorEnabled())
272: log.error("Failed to close server socket: " + e);
273: }
274: // exiting the mainLoop will clean the tables
275: srvSock = null;
276: if (log.isInfoEnabled())
277: log.info("router stopped");
278: }
279:
280: /**
281: * JBoss MBean lifecycle operation.
282: */
283: public void destroy() {
284: // not used
285: }
286:
287: //
288: // ORDINARY OPERATIONS
289: //
290:
291: public String dumpRoutingTable() {
292: String label = "routing";
293: StringBuffer sb = new StringBuffer();
294:
295: if (routingTable.isEmpty()) {
296: sb.append("empty ").append(label).append(" table");
297: } else {
298: for (Iterator i = routingTable.keySet().iterator(); i
299: .hasNext();) {
300: String gname = (String) i.next();
301: sb.append("GROUP: '" + gname + "'\n");
302: Map map = (Map) routingTable.get(gname);
303: if (map == null) {
304: sb.append("\tnull list of addresses\n");
305: } else if (map.isEmpty()) {
306: sb.append("\tempty list of addresses\n");
307: } else {
308: AddressEntry ae;
309: for (Iterator j = map.values().iterator(); j
310: .hasNext();) {
311: ae = (AddressEntry) j.next();
312: sb.append('\t');
313: sb.append(ae);
314: sb.append('\n');
315: }
316: }
317: }
318: }
319: return sb.toString();
320: }
321:
322: /**
323: * The main server loop. Runs on the JGroups Router Main Thread.
324: */
325: private void mainLoop() {
326: Socket sock = null;
327: DataInputStream input = null;
328: DataOutputStream output = null;
329: Address peer_addr = null, mbr, logical_addr;
330:
331: if (bindAddress == null) {
332: bindAddress = srvSock.getInetAddress();
333: }
334: System.out.println("GossipRouter started at " + new Date()
335: + "\nListening on port " + port + " bound on address "
336: + bindAddress + '\n');
337:
338: GossipData req;
339: String group;
340:
341: while (up && srvSock != null) {
342: try {
343: sock = srvSock.accept();
344: sock.setSoLinger(true, 500);
345: input = new DataInputStream(sock.getInputStream());
346: // if(log.isTraceEnabled())
347: // log.trace("accepted connection from " + sock);
348:
349: req = new GossipData();
350: req.readFrom(input);
351:
352: switch (req.getType()) {
353: case GossipRouter.REGISTER:
354: mbr = req.getAddress();
355: group = req.getGroup();
356: if (log.isTraceEnabled())
357: log.trace("REGISTER(" + group + ", " + mbr
358: + ")");
359: if (group == null || mbr == null) {
360: if (log.isErrorEnabled())
361: log
362: .error("group or member is null, cannot register member");
363: } else
364: addGossipEntry(group, mbr,
365: new AddressEntry(mbr));
366: Util.close(input);
367: Util.close(sock);
368: break;
369:
370: case GossipRouter.UNREGISTER:
371: mbr = req.getAddress();
372: group = req.getGroup();
373: if (log.isTraceEnabled())
374: log.trace("UNREGISTER(" + group + ", " + mbr
375: + ")");
376: if (group == null || mbr == null) {
377: if (log.isErrorEnabled())
378: log
379: .error("group or member is null, cannot unregister member");
380: } else
381: removeGossipEntry(group, mbr);
382: Util.close(input);
383: Util.close(output);
384: Util.close(sock);
385: break;
386:
387: case GossipRouter.GOSSIP_GET:
388: group = req.getGroup();
389: List mbrs = null;
390: Map map;
391: map = (Map) routingTable.get(group);
392: if (map != null) {
393: mbrs = new LinkedList(map.keySet());
394: }
395:
396: if (log.isTraceEnabled())
397: log.trace("GOSSIP_GET(" + group + ") --> "
398: + mbrs);
399: output = new DataOutputStream(sock
400: .getOutputStream());
401: GossipData rsp = new GossipData(
402: GossipRouter.GET_RSP, group, null, mbrs);
403: rsp.writeTo(output);
404: Util.close(input);
405: Util.close(output);
406: Util.close(sock);
407: break;
408:
409: case GossipRouter.ROUTER_GET:
410: group = req.getGroup();
411: output = new DataOutputStream(sock
412: .getOutputStream());
413:
414: List ret = null;
415: map = (Map) routingTable.get(group);
416: if (map != null) {
417: ret = new LinkedList(map.keySet());
418: } else
419: ret = new LinkedList();
420: if (log.isTraceEnabled())
421: log.trace("ROUTER_GET(" + group + ") --> "
422: + ret);
423: rsp = new GossipData(GossipRouter.GET_RSP, group,
424: null, ret);
425: rsp.writeTo(output);
426: Util.close(input);
427: Util.close(output);
428: Util.close(sock);
429: break;
430:
431: case GossipRouter.DUMP:
432: output = new DataOutputStream(sock
433: .getOutputStream());
434: output.writeUTF(dumpRoutingTable());
435: Util.close(input);
436: Util.close(output);
437: Util.close(sock);
438: break;
439:
440: case GossipRouter.CONNECT:
441: output = new DataOutputStream(sock
442: .getOutputStream());
443: peer_addr = new IpAddress(sock.getInetAddress(),
444: sock.getPort());
445: output = new DataOutputStream(sock
446: .getOutputStream());
447: logical_addr = req.getAddress();
448: String group_name = req.getGroup();
449:
450: if (log.isTraceEnabled())
451: log.trace("CONNECT(" + group_name + ", "
452: + logical_addr + ")");
453: SocketThread st = new SocketThread(sock, input,
454: group_name, logical_addr);
455: addEntry(group_name, logical_addr,
456: new AddressEntry(logical_addr, peer_addr,
457: sock, st, output));
458: st.start();
459: break;
460:
461: case GossipRouter.DISCONNECT:
462: Address addr = req.getAddress();
463: group_name = req.getGroup();
464: removeEntry(group_name, addr);
465: if (log.isTraceEnabled())
466: log.trace("DISCONNECT(" + group_name + ", "
467: + addr + ")");
468: Util.close(input);
469: Util.close(output);
470: Util.close(sock);
471: break;
472:
473: case GossipRouter.SHUTDOWN:
474: if (log.isInfoEnabled())
475: log.info("router shutting down");
476: Util.close(input);
477: Util.close(output);
478: Util.close(sock);
479: up = false;
480: break;
481: default:
482: if (log.isWarnEnabled())
483: log
484: .warn("received unkown gossip request (gossip="
485: + req + ')');
486: break;
487: }
488: } catch (Exception e) {
489: if (up)
490: if (log.isErrorEnabled())
491: log.error("failure handling a client request",
492: e);
493: Util.close(input);
494: Util.close(output);
495: Util.close(sock);
496: }
497: }
498: }
499:
500: /**
501: * Cleans the routing tables while the Router is going down.
502: */
503: private void cleanup() {
504:
505: // shutdown the routing threads and cleanup the tables
506: Map map;
507: for (Iterator i = routingTable.values().iterator(); i.hasNext();) {
508: map = (Map) i.next();
509: if (map != null) {
510: for (Iterator j = map.values().iterator(); j.hasNext();) {
511: AddressEntry e = (AddressEntry) j.next();
512: e.destroy();
513: }
514: }
515: }
516: routingTable.clear();
517: }
518:
519: /**
520: * Connects to the ServerSocket and sends the shutdown header.
521: */
522: private void shutdown() {
523: Socket s = null;
524: DataOutputStream dos = null;
525: try {
526: s = new Socket(srvSock.getInetAddress(), srvSock
527: .getLocalPort());
528: dos = new DataOutputStream(s.getOutputStream());
529: dos.writeInt(SHUTDOWN);
530: dos.writeUTF("");
531: } catch (Exception e) {
532: if (log.isErrorEnabled())
533: log.error("shutdown failed: " + e);
534: } finally {
535: Util.close(s);
536: Util.close(dos);
537: }
538: }
539:
540: /**
541: * Removes expired gossip entries (entries older than EXPIRY_TIME msec).
542: * @since 2.2.1
543: */
544: private void sweep() {
545: long diff, currentTime = System.currentTimeMillis();
546: int num_entries_removed = 0;
547:
548: Map.Entry entry, entry2;
549: Map map;
550: AddressEntry ae;
551: for (Iterator it = routingTable.entrySet().iterator(); it
552: .hasNext();) {
553: entry = (Map.Entry) it.next();
554: map = (Map) entry.getValue();
555: if (map == null || map.isEmpty()) {
556: it.remove();
557: continue;
558: }
559: for (Iterator it2 = map.entrySet().iterator(); it2
560: .hasNext();) {
561: entry2 = (Map.Entry) it2.next();
562: ae = (GossipRouter.AddressEntry) entry2.getValue();
563: diff = currentTime - ae.timestamp;
564: if (diff > expiryTime) {
565: it2.remove();
566: if (log.isTraceEnabled())
567: log.trace("removed " + ae.logical_addr + " ("
568: + diff + " msecs old)");
569: num_entries_removed++;
570: }
571: }
572: }
573:
574: if (num_entries_removed > 0) {
575: if (log.isTraceEnabled())
576: log.trace("done (removed " + num_entries_removed
577: + " entries)");
578: }
579: }
580:
581: private void route(Address dest, String dest_group, byte[] msg,
582: Address sender) {
583: //if(log.isTraceEnabled()) {
584: // int len=msg != null? msg.length : 0;
585: //log.trace("routing request from " + sender + " for " + dest_group + " to " +
586: // (dest == null? "ALL" : dest.toString()) + ", " + len + " bytes");
587: //}
588:
589: if (dest == null) { // send to all members in group dest.getChannelName()
590: if (dest_group == null) {
591: if (log.isErrorEnabled())
592: log.error("both dest address and group are null");
593: } else {
594: sendToAllMembersInGroup(dest_group, msg, sender);
595: }
596: } else {
597: // send to destination address
598: AddressEntry ae = findAddressEntry(dest_group, dest);
599: if (ae == null) {
600: if (log.isTraceEnabled())
601: log
602: .trace("cannot find "
603: + dest
604: + " in the routing table, \nrouting table=\n"
605: + dumpRoutingTable());
606: return;
607: }
608: if (ae.output == null) {
609: if (log.isErrorEnabled())
610: log
611: .error(dest
612: + " is associated with a null output stream");
613: return;
614: }
615: try {
616: sendToMember(dest, ae.output, msg, sender);
617: } catch (Exception e) {
618: if (log.isErrorEnabled())
619: log.error("failed sending message to " + dest
620: + ": " + e.getMessage());
621: removeEntry(dest_group, dest); // will close socket
622: }
623: }
624: }
625:
626: private void addEntry(String groupname, Address logical_addr,
627: AddressEntry entry) {
628: addEntry(groupname, logical_addr, entry, false);
629: }
630:
631: /**
632: * Adds a new member to the routing group.
633: */
634: private void addEntry(String groupname, Address logical_addr,
635: AddressEntry entry, boolean update_only) {
636: if (groupname == null || logical_addr == null) {
637: if (log.isErrorEnabled())
638: log
639: .error("groupname or logical_addr was null, entry was not added");
640: return;
641: }
642:
643: synchronized (routingTable) {
644: Map mbrs = (Map) routingTable.get(groupname);
645: if (mbrs == null) {
646: mbrs = new ConcurrentHashMap();
647: mbrs.put(logical_addr, entry);
648: routingTable.put(groupname, mbrs);
649: } else {
650: AddressEntry tmp = (AddressEntry) mbrs
651: .get(logical_addr);
652: if (tmp != null) { // already present
653: if (update_only) {
654: tmp.update();
655: return;
656: }
657: tmp.destroy();
658: }
659: mbrs.put(logical_addr, entry);
660: }
661: }
662: }
663:
664: private void removeEntry(String groupname, Address logical_addr) {
665: Map val;
666: val = (Map) routingTable.get(groupname);
667: if (val == null)
668: return;
669: synchronized (val) {
670: AddressEntry entry = (AddressEntry) val.get(logical_addr);
671: if (entry != null) {
672: entry.destroy();
673: val.remove(logical_addr);
674: }
675: }
676: }
677:
678: /**
679: * @return null if not found
680: */
681: private AddressEntry findAddressEntry(String group_name,
682: Address logical_addr) {
683: if (group_name == null || logical_addr == null)
684: return null;
685: Map val = (Map) routingTable.get(group_name);
686: if (val == null)
687: return null;
688: return (AddressEntry) val.get(logical_addr);
689: }
690:
691: /**
692: * Adds a new member to the group in the gossip table or renews the
693: * membership where is the case.
694: * @since 2.2.1
695: */
696: private void addGossipEntry(String groupname, Address logical_addr,
697: AddressEntry e) {
698: addEntry(groupname, logical_addr, e, true);
699: }
700:
701: private void removeGossipEntry(String groupname, Address mbr) {
702: removeEntry(groupname, mbr);
703: }
704:
705: private void sendToAllMembersInGroup(String groupname, byte[] msg,
706: Address sender) {
707: Map val;
708: val = (Map) routingTable.get(groupname);
709: if (val == null || val.isEmpty())
710: return;
711:
712: Map.Entry tmp;
713: AddressEntry entry;
714: synchronized (val) {
715: for (Iterator i = val.entrySet().iterator(); i.hasNext();) {
716: tmp = (Map.Entry) i.next();
717: entry = (GossipRouter.AddressEntry) tmp.getValue();
718: DataOutputStream dos = entry.output;
719:
720: if (dos != null) {
721: // send only to 'connected' members
722: try {
723: sendToMember(null, dos, msg, sender);
724: } catch (Exception e) {
725: if (log.isWarnEnabled())
726: log.warn("cannot send to "
727: + entry.logical_addr + ": "
728: + e.getMessage());
729: entry.destroy(); // this closes the socket
730: i.remove();
731: }
732: }
733: }
734: }
735: }
736:
737: /**
738: * @throws IOException
739: */
740: private void sendToMember(Address dest, DataOutputStream out,
741: byte[] msg, Address sender) throws IOException {
742: if (out == null)
743: return;
744:
745: if (discard_loopbacks && dest != null && dest.equals(sender)) {
746: return;
747: }
748:
749: synchronized (out) {
750: Util.writeAddress(dest, out);
751: out.writeInt(msg.length);
752: out.write(msg, 0, msg.length);
753: }
754: }
755:
756: /**
757: * Class used to store Addresses in both routing and gossip tables.
758: * If it is used for routing, sock and output have valid values, otherwise
759: * they're null and only the timestamp counts.
760: */
761: class AddressEntry {
762: Address logical_addr = null, physical_addr = null;
763: Socket sock = null;
764: DataOutputStream output = null;
765: long timestamp = 0;
766: final SocketThread thread;
767:
768: /**
769: * AddressEntry for a 'gossip' membership.
770: */
771: public AddressEntry(Address addr) {
772: this (addr, null, null, null, null);
773: }
774:
775: public AddressEntry(Address logical_addr,
776: Address physical_addr, Socket sock,
777: SocketThread thread, DataOutputStream output) {
778: this .logical_addr = logical_addr;
779: this .physical_addr = physical_addr;
780: this .sock = sock;
781: this .thread = thread;
782: this .output = output;
783: this .timestamp = System.currentTimeMillis();
784: }
785:
786: void destroy() {
787: if (thread != null) {
788: thread.finish();
789: }
790: Util.close(output);
791: output = null;
792: Util.close(sock);
793: sock = null;
794: timestamp = 0;
795: }
796:
797: public void update() {
798: timestamp = System.currentTimeMillis();
799: }
800:
801: public boolean equals(Object other) {
802: return logical_addr
803: .equals(((AddressEntry) other).logical_addr);
804: }
805:
806: public String toString() {
807: StringBuffer sb = new StringBuffer("logical addr=");
808: sb.append(logical_addr).append(" (").append(physical_addr)
809: .append(")");
810: //if(sock != null) {
811: // sb.append(", sock=");
812: //sb.append(sock);
813: //}
814: if (timestamp > 0) {
815: long diff = System.currentTimeMillis() - timestamp;
816: sb.append(", ").append(diff).append(" ms old");
817: }
818: return sb.toString();
819: }
820: }
821:
822: private static int threadCounter = 0;
823:
824: /**
825: * A SocketThread manages one connection to a client. Its main task is message routing.
826: */
827: class SocketThread extends Thread {
828: private volatile boolean active = true;
829: Socket sock = null;
830: DataInputStream input = null;
831: Address logical_addr = null;
832: String group_name = null;
833:
834: public SocketThread(Socket sock, DataInputStream ois,
835: String group_name, Address logical_addr) {
836: super (Util.getGlobalThreadGroup(), "SocketThread "
837: + (threadCounter++));
838: this .sock = sock;
839: input = ois;
840: this .group_name = group_name;
841: this .logical_addr = logical_addr;
842: }
843:
844: void closeSocket() {
845: Util.close(input);
846: Util.close(sock);
847: }
848:
849: void finish() {
850: active = false;
851: }
852:
853: public void run() {
854: byte[] buf;
855: int len;
856: Address dst_addr = null;
857: String gname;
858:
859: while (active) {
860: try {
861: // 1. Group name is first
862: gname = input.readUTF();
863:
864: // 2. Second is the destination address
865: dst_addr = Util.readAddress(input);
866:
867: // 3. Then the length of the byte buffer representing the message
868: len = input.readInt();
869: if (len == 0) {
870: if (log.isWarnEnabled())
871: log.warn("received null message");
872: continue;
873: }
874:
875: // 4. Finally the message itself, as a byte buffer
876: buf = new byte[len];
877: input.readFully(buf, 0, buf.length); // message
878: } catch (Exception io_ex) {
879: if (log.isTraceEnabled())
880: log
881: .trace(sock.getInetAddress()
882: .getHostName()
883: + ':'
884: + sock.getPort()
885: + " closed connection; removing it from routing table");
886: removeEntry(group_name, logical_addr); // will close socket
887: return;
888: }
889:
890: try {
891: route(dst_addr, gname, buf, logical_addr);
892: } catch (Exception e) {
893: if (log.isErrorEnabled())
894: log.error("failed routing request to "
895: + dst_addr, e);
896: break;
897: }
898: }
899: closeSocket();
900: }
901:
902: }
903:
904: public static void main(String[] args) throws Exception {
905: String arg;
906: int port = 12001;
907: long expiry = GossipRouter.EXPIRY_TIME;
908: long timeout = GossipRouter.GOSSIP_REQUEST_TIMEOUT;
909: long routingTimeout = GossipRouter.ROUTING_CLIENT_REPLY_TIMEOUT;
910: GossipRouter router = null;
911: String bind_addr = null;
912:
913: for (int i = 0; i < args.length; i++) {
914: arg = args[i];
915: if ("-port".equals(arg)) {
916: port = Integer.parseInt(args[++i]);
917: continue;
918: }
919: if ("-bindaddress".equals(arg) || "-bind_addr".equals(arg)) {
920: bind_addr = args[++i];
921: continue;
922: }
923: if ("-expiry".equals(arg)) {
924: expiry = Long.parseLong(args[++i]);
925: continue;
926: }
927: if ("-timeout".equals(arg)) {
928: timeout = Long.parseLong(args[++i]);
929: continue;
930: }
931: if ("-rtimeout".equals(arg)) {
932: routingTimeout = Long.parseLong(args[++i]);
933: continue;
934: }
935: help();
936: return;
937: }
938: System.out.println("GossipRouter is starting...");
939:
940: try {
941: ClassConfigurator.getInstance(true);
942: router = new GossipRouter(port, bind_addr, expiry, timeout,
943: routingTimeout);
944: router.start();
945: } catch (Exception e) {
946: System.err.println(e);
947: }
948: }
949:
950: static void help() {
951: System.out.println();
952: System.out
953: .println("GossipRouter [-port <port>] [-bind_addr <address>] [options]");
954: System.out.println("Options: ");
955: System.out
956: .println(" -expiry <msecs> - Time until a gossip cache entry expires.");
957: System.out
958: .println(" -timeout <msecs> - Number of millisecs the router waits to receive");
959: System.out
960: .println(" a gossip request after connection was established;");
961: System.out
962: .println(" upon expiration, the router initiates the routing");
963: System.out
964: .println(" protocol on the connection.");
965: }
966:
967: }
|