001: package com.tc.net.proxy;
002:
003: import com.tc.util.StringUtil;
004:
005: import java.io.BufferedReader;
006: import java.io.File;
007: import java.io.FileOutputStream;
008: import java.io.IOException;
009: import java.io.InputStream;
010: import java.io.InputStreamReader;
011: import java.io.OutputStream;
012: import java.net.InetAddress;
013: import java.net.InetSocketAddress;
014: import java.net.ServerSocket;
015: import java.net.Socket;
016: import java.net.SocketTimeoutException;
017: import java.util.Date;
018: import java.util.HashSet;
019: import java.util.Set;
020:
021: /*
022: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
023: * notice. All rights reserved.
024: */
025:
026: /**
027: * A simple TCP proxy (with round robin load balancing support) to simulate network delays and help debug network
028: * streams.
029: */
030: public class TCPProxy {
031:
032: private volatile boolean debug;
033: private long delay;
034: private final int listenPort;
035: private final InetSocketAddress[] endpoints;
036: private int roundRobinSequence;
037: private ServerSocket serverSocket;
038: private Thread acceptThread;
039: private volatile boolean stop;
040: private final Set connections = new HashSet();
041: private final File logDir;
042: private final boolean logData;
043: private boolean reuseAddress = false;
044:
045: public TCPProxy(int listenPort, InetAddress destHost, int destPort,
046: long delay, boolean logData, File logDir) {
047: this (listenPort,
048: new InetSocketAddress[] { new InetSocketAddress(
049: destHost, destPort) }, delay, logData, logDir);
050: }
051:
052: /**
053: * If multiple endpoints are used, then the proxy will round robin between them.
054: */
055: public TCPProxy(int listenPort, InetSocketAddress[] endpoints,
056: long delay, boolean logData, File logDir) {
057: this .roundRobinSequence = 0;
058: this .debug = false;
059: this .stop = false;
060: this .listenPort = listenPort;
061: this .endpoints = endpoints;
062: this .logData = logData;
063: this .logDir = logDir;
064: setDelay(delay);
065:
066: verifyEndpoints();
067: }
068:
069: private void verifyEndpoints() {
070: for (int i = 0; i < endpoints.length; i++) {
071: InetSocketAddress addr = endpoints[i];
072: if (addr.getAddress() == null) {
073: //
074: throw new RuntimeException(
075: "Cannot resolve address for host "
076: + addr.getHostName());
077: }
078: }
079: }
080:
081: public void setReuseAddress(boolean reuse) {
082: reuseAddress = reuse;
083: }
084:
085: /*
086: * Probe if backend is ready for connection. Make sure L2 is ready before calling start().
087: */
088: public boolean probeBackendConnection() {
089: Socket connectedSocket = null;
090: for (int pos = 0; connectedSocket == null
091: && pos < endpoints.length; ++pos) {
092: final int roundRobinOffset = (pos + roundRobinSequence)
093: % endpoints.length;
094: try {
095: connectedSocket = new Socket(
096: endpoints[roundRobinOffset].getAddress(),
097: endpoints[roundRobinOffset].getPort());
098: break;
099: } catch (IOException ioe) {
100: //
101: }
102: }
103: if (connectedSocket != null) {
104: try {
105: connectedSocket.close();
106: } catch (Exception e) {
107: //
108: }
109: return (true);
110: } else
111: return (false);
112: }
113:
114: public synchronized void start() throws IOException {
115: stop();
116:
117: log("Starting listener on port " + listenPort
118: + ", proxying to "
119: + StringUtil.toString(endpoints, ", ", "[", "]")
120: + " with " + getDelay() + "ms delay");
121:
122: if (!reuseAddress) {
123: serverSocket = new ServerSocket(listenPort);
124: } else {
125: serverSocket = new ServerSocket();
126: serverSocket.setReuseAddress(true);
127: try {
128: serverSocket.bind(new InetSocketAddress(
129: (InetAddress) null, listenPort), 50);
130: } catch (IOException e) {
131: serverSocket.close();
132: throw e;
133: }
134: }
135:
136: stop = false;
137:
138: final TCPProxy ME = this ;
139: acceptThread = new Thread(new Runnable() {
140: public void run() {
141: ME.run();
142: }
143: }, "Accept thread (port " + listenPort + ")");
144: acceptThread.start();
145: }
146:
147: /*
148: * Stop without joing dead threads. This is to workaround the issue of taking too long to stop proxy which longer than
149: * OOO's L1 reconnect timeout.
150: */
151: public synchronized void fastStop() {
152: subStop(false);
153: }
154:
155: public synchronized void stop() {
156: subStop(true);
157: }
158:
159: synchronized void subStop(boolean waitDeadThread) {
160: stop = true;
161:
162: try {
163: if (serverSocket != null) {
164: serverSocket.close();
165: }
166: } catch (Exception e) {
167: log("Error closing serverSocket", e);
168: } finally {
169: serverSocket = null;
170: }
171:
172: /*
173: * Observed on windows-xp. The ServerSocket is still hanging around after "close()", until someone makes a new
174: * connection. To make sure the old ServerSocket and accept thread go away for good, fake a connection to the old
175: * socket.
176: */
177: try {
178: Socket sk = new Socket("localhost", listenPort);
179: sk.close();
180: } catch (Exception x) {
181: // that's fine for fake connection.
182: }
183:
184: try {
185: if (acceptThread != null) {
186: acceptThread.interrupt();
187:
188: try {
189: acceptThread.join(10000);
190: } catch (InterruptedException e) {
191: log("Interrupted while join()'ing acceptor thread",
192: e);
193: }
194: }
195: } finally {
196: acceptThread = null;
197: }
198:
199: closeAllConnections(waitDeadThread);
200: }
201:
202: public synchronized void closeClientConnections(
203: boolean waitDeadThread, boolean split) {
204: Connection conns[];
205: synchronized (connections) {
206: conns = (Connection[]) connections
207: .toArray(new Connection[] {});
208: }
209:
210: for (int i = 0; i < conns.length; i++) {
211: try {
212: conns[i].closeClientHalf(waitDeadThread, split);
213: } catch (Exception e) {
214: log("Error closing client-side connection "
215: + conns[i].toString(), e);
216: }
217: }
218: }
219:
220: synchronized void closeAllConnections(boolean waitDeadThread) {
221: Connection conns[];
222: synchronized (connections) {
223: conns = (Connection[]) connections
224: .toArray(new Connection[] {});
225: }
226:
227: for (int i = 0; i < conns.length; i++) {
228: try {
229: conns[i].close(waitDeadThread);
230: } catch (Exception e) {
231: log("Error closing connection " + conns[i].toString(),
232: e);
233: }
234: }
235: }
236:
237: public void toggleDebug() {
238: debug = !debug;
239: }
240:
241: public synchronized long getDelay() {
242: return delay;
243: }
244:
245: public synchronized void setDelay(long newDelay) {
246: if (newDelay < 0) {
247: throw new IllegalArgumentException(
248: "Delay must be greater than or equal to zero");
249: }
250: delay = newDelay;
251: }
252:
253: void interrupt() {
254: Connection conns[];
255: synchronized (connections) {
256: conns = (Connection[]) connections
257: .toArray(new Connection[] {});
258: }
259:
260: for (int i = 0; i < conns.length; i++) {
261: conns[i].interrupt();
262: }
263: }
264:
265: private void run() {
266: while (!stop) {
267: final Socket socket;
268: try {
269: socket = serverSocket.accept();
270: } catch (IOException ioe) {
271: continue;
272: }
273:
274: if (Thread.interrupted()) {
275: continue;
276: }
277:
278: if (socket != null) {
279: debug("Accepted connection from " + socket.toString());
280:
281: try {
282: new Connection(socket, this , logData, logDir);
283: } catch (IOException ioe) {
284: log("Error connecting to any of remote hosts "
285: + StringUtil.toString(endpoints, ", ", "[",
286: "]") + ", " + ioe.getMessage());
287: try {
288: socket.close();
289: } catch (IOException clientIOE) {
290: log("Unable to close client socket after failing to proxy: "
291: + clientIOE.getMessage());
292: }
293: }
294: }
295: }
296: }
297:
298: private synchronized int getAndIncrementRoundRobinSequence() {
299: return roundRobinSequence++;
300: }
301:
302: void deregister(Connection connection) {
303: synchronized (connections) {
304: connections.remove(connection);
305: }
306: }
307:
308: void register(Connection connection) {
309: synchronized (connections) {
310: connections.add(connection);
311: }
312: }
313:
314: public void status() {
315: synchronized (System.err) {
316: System.err.println();
317: System.err.println("Listening on port : " + listenPort);
318: System.err.println("Connection delay : " + getDelay()
319: + "ms");
320: System.err.println("Proxying to : "
321: + StringUtil.toString(endpoints, ", ", "[", "]"));
322: System.err.println("Debug Logging : " + debug);
323: System.err.println("Active connections:");
324:
325: Connection conns[];
326: synchronized (connections) {
327: conns = (Connection[]) connections
328: .toArray(new Connection[] {});
329: }
330:
331: for (int i = 0; i < conns.length; i++) {
332: System.err.println("\t" + i + ": "
333: + conns[i].toString());
334: }
335:
336: if (conns.length == 0) {
337: System.err.println("\tNONE");
338: }
339: }
340: }
341:
342: private static void help() {
343: synchronized (System.err) {
344: System.err.println();
345: System.err.println("h - this help message");
346: System.err.println("s - print proxy status");
347: System.err
348: .println("d <num> - adjust the delay time to <num> milliseconds");
349: System.err
350: .println("c - close all active connections");
351: System.err.println("l - toggle debug logging");
352: System.err.println("q - quit (shutdown proxy)");
353: }
354: }
355:
356: public static void main(String[] args) throws IOException,
357: InterruptedException {
358: if ((args.length < 2) || (args.length > 3)) {
359: usage();
360: System.exit(1);
361: }
362:
363: final int listenPort = Integer.valueOf(args[0]).intValue();
364: final String[] endpointStrings = args[1].split(",");
365: final InetSocketAddress[] endpoints = new InetSocketAddress[endpointStrings.length];
366: for (int pos = 0; pos < endpointStrings.length; ++pos) {
367: final int separatorIdx = endpointStrings[pos].indexOf(":");
368: endpoints[pos] = new InetSocketAddress(endpointStrings[pos]
369: .substring(0, separatorIdx), Integer
370: .parseInt(endpointStrings[pos]
371: .substring(separatorIdx + 1)));
372: }
373:
374: long delay = 0;
375: if (args.length == 3) {
376: delay = (Long.valueOf(args[2]).longValue());
377: }
378:
379: // If this is set to true then we are in non-interactive mode and don't print a prompt
380: final boolean daemonMode = Boolean.getBoolean("daemon");
381:
382: final TCPProxy theProxy = new TCPProxy(listenPort, endpoints,
383: delay, false, null);
384: theProxy.start();
385:
386: if (daemonMode) {
387: final Object o = new Object();
388: synchronized (o) {
389: o.wait();
390: }
391: } else {
392: try {
393: BufferedReader stdin = new BufferedReader(
394: new InputStreamReader(System.in));
395: String line = "";
396: prompt();
397: while ((line = stdin.readLine()) != null) {
398: line = line.trim();
399:
400: if (line.toLowerCase().startsWith("q")) {
401: break;
402: }
403:
404: try {
405: if (line.toLowerCase().startsWith("h")) {
406: help();
407: continue;
408: }
409:
410: if (line.toLowerCase().startsWith("s")) {
411: theProxy.status();
412: continue;
413: }
414:
415: if (line.toLowerCase().startsWith("c")) {
416: theProxy.closeAllConnections(true);
417: out("all connections closed");
418: continue;
419: }
420:
421: if (line.toLowerCase().startsWith("l")) {
422: theProxy.toggleDebug();
423: out("debug logging toggled");
424: continue;
425: }
426:
427: if (line.toLowerCase().startsWith("d")) {
428: if (line.length() <= 2) {
429: out("you must supply a delay value");
430: continue;
431: }
432:
433: try {
434: theProxy.setDelay(Long.valueOf(
435: line.substring(2)).longValue());
436: theProxy.interrupt();
437: } catch (Exception e) {
438: out(e);
439: }
440: continue;
441: }
442: } catch (Exception e) {
443: out(e);
444: } finally {
445: prompt();
446: }
447: }
448: } finally {
449: theProxy.stop();
450: }
451: }
452: }
453:
454: private static class Connection {
455: private final Socket client;
456: private final Socket proxy;
457: private final TCPProxy parent;
458: private final Thread clientThread;
459: private final Thread proxyThread;
460: private final Object closeLock = new Object();
461: private volatile boolean stopConn = false;
462: private final long connectTime;
463: private long lastActivity;
464: private long clientBytesIn = 0;
465: private long proxyBytesIn = 0;
466: private final OutputStream clientLog;
467: private final OutputStream proxyLog;
468: private volatile boolean allowSplit = false;
469:
470: Connection(Socket client, TCPProxy parent, boolean logData,
471: File logDir) throws IOException {
472: this .parent = parent;
473: this .client = client;
474: this .connectTime = System.currentTimeMillis();
475: this .lastActivity = this .connectTime;
476:
477: // Round robin and try connecting to the next available backend server; this is done by adding an ever increasing
478: // sequence number to the offset into the endpoint array (and then mod'ing it so you don't index past the array);
479: // this will ensure that you loop through the array in order and start over at the beginning once you reach the
480: // end
481: IOException lastConnectException = null;
482: Socket connectedSocket = null;
483: final int roundRobinSequence = parent
484: .getAndIncrementRoundRobinSequence();
485: for (int pos = 0; connectedSocket == null
486: && pos < parent.endpoints.length; ++pos) {
487: final int roundRobinOffset = (pos + roundRobinSequence)
488: % parent.endpoints.length;
489: try {
490: connectedSocket = new Socket(
491: parent.endpoints[roundRobinOffset]
492: .getAddress(),
493: parent.endpoints[roundRobinOffset]
494: .getPort());
495: break;
496: } catch (IOException ioe) {
497: lastConnectException = ioe;
498: }
499: }
500: if (connectedSocket == null) {
501: final IOException ioe = lastConnectException != null ? lastConnectException
502: : new IOException(
503: "Unable to establish a proxy connection to a back end server: "
504: + StringUtil.toString(
505: parent.endpoints, ",",
506: "[", "]"));
507: throw ioe;
508: } else {
509: proxy = connectedSocket;
510: }
511:
512: if (logData) {
513: final String log = client.getLocalAddress()
514: .getHostName().toString()
515: + "." + client.getPort();
516: clientLog = new FileOutputStream(new File(logDir, log
517: + ".in"), false);
518: proxyLog = new FileOutputStream(new File(logDir, log
519: + ".out"), false);
520: } else {
521: clientLog = null;
522: proxyLog = null;
523: }
524:
525: proxy.setSoTimeout(100);
526: client.setSoTimeout(100);
527: // TcpDealy can cause multiple times slower for small packages.
528: proxy.setTcpNoDelay(true);
529: client.setTcpNoDelay(true);
530:
531: final InputStream clientIs = client.getInputStream();
532: final OutputStream clientOs = client.getOutputStream();
533: final InputStream proxyIs = proxy.getInputStream();
534: final OutputStream proxyOs = proxy.getOutputStream();
535:
536: parent.register(this );
537:
538: clientThread = new Thread(new Runnable() {
539: public void run() {
540: runHalf(clientIs, proxyOs, true, clientLog,
541: Connection.this .client);
542: }
543: }, "Client thread for connection " + client + " proxy to "
544: + proxy);
545:
546: proxyThread = new Thread(new Runnable() {
547: public void run() {
548: runHalf(proxyIs, clientOs, false, proxyLog, proxy);
549: }
550: }, "Proxy thread for connection " + client + " proxy to "
551: + proxy);
552:
553: clientThread.start();
554: proxyThread.start();
555: }
556:
557: private synchronized void activity() {
558: lastActivity = System.currentTimeMillis();
559: }
560:
561: private synchronized long getLastActivity() {
562: return lastActivity;
563: }
564:
565: private synchronized void addProxyBytesIn(long bytesIn) {
566: this .proxyBytesIn += bytesIn;
567: }
568:
569: private synchronized void addClientBytesIn(long bytesIn) {
570: this .clientBytesIn += bytesIn;
571: }
572:
573: private synchronized long getProxyBytesIn() {
574: return this .proxyBytesIn;
575: }
576:
577: private synchronized long getClientBytesIn() {
578: return this .clientBytesIn;
579: }
580:
581: public String toString() {
582: return "Client: " + client + ", proxy to: " + proxy
583: + ", connect: " + new Date(connectTime)
584: + ", idle: "
585: + (System.currentTimeMillis() - getLastActivity())
586: + ", bytes from client: " + getClientBytesIn()
587: + ", bytes from endpoint: " + getProxyBytesIn();
588: }
589:
590: private void delay() {
591: final long sleep = parent.getDelay();
592:
593: if (sleep > 0) {
594: try {
595: Thread.sleep(sleep);
596: } catch (InterruptedException e) {
597: // ignore
598: }
599: }
600: }
601:
602: private void runHalf(InputStream src, OutputStream dest,
603: boolean isClientHalf, OutputStream log, Socket s) {
604: byte buffer[] = new byte[4096];
605:
606: while (!stopConn) {
607: int bytesRead = 0;
608: try {
609: bytesRead = src.read(buffer);
610: } catch (SocketTimeoutException ste) {
611: bytesRead = ste.bytesTransferred;
612: } catch (IOException ioe) {
613: parent.debug("IOException on "
614: + (isClientHalf ? "client" : "proxy")
615: + " connection", ioe);
616: return;
617: } finally {
618: if (bytesRead > 0) {
619: try {
620: if (log != null) {
621: log.write(buffer, 0, bytesRead);
622: log.flush();
623: }
624: } catch (IOException e) {
625: throw new RuntimeException(e);
626: }
627: parent.debug("read " + bytesRead + " on "
628: + (isClientHalf ? "client" : "proxy")
629: + " connection");
630: if (isClientHalf)
631: addClientBytesIn(bytesRead);
632: else
633: addProxyBytesIn(bytesRead);
634: }
635: }
636:
637: if (bytesRead < 0) {
638: // delay();
639: if (!allowSplit) {
640: close(true);
641: }
642: return;
643: }
644:
645: if (bytesRead > 0) {
646: activity();
647: delay();
648:
649: try {
650: dest.write(buffer, 0, bytesRead);
651: dest.flush();
652: } catch (IOException ioe) {
653: if (!allowSplit) {
654: close(true);
655: }
656: return;
657: }
658: }
659: }
660: }
661:
662: void interrupt() {
663: try {
664: clientThread.interrupt();
665: } finally {
666: proxyThread.interrupt();
667: }
668: }
669:
670: void closeClientHalf(boolean wait, boolean split) {
671: this .allowSplit = split;
672: try {
673: closeHalf(client, clientThread, clientLog, wait);
674: } catch (Throwable t) {
675: t.printStackTrace();
676: }
677:
678: }
679:
680: void closeProxyHalf(boolean wait, boolean split) {
681: this .allowSplit = split;
682: try {
683: closeHalf(proxy, proxyThread, proxyLog, wait);
684: } catch (Throwable t) {
685: t.printStackTrace();
686: }
687: }
688:
689: private static void closeHalf(Socket socket, Thread thread,
690: OutputStream out, boolean wait) {
691: try {
692: try {
693: if (socket != null)
694: socket.close();
695: } catch (IOException e) {
696: // ignore
697: }
698:
699: thread.interrupt();
700:
701: if (wait) {
702: try {
703: thread.join(1000);
704: } catch (InterruptedException ie) {
705: // ignore
706: }
707: }
708: } finally {
709: try {
710: if (out != null) {
711: out.close();
712: }
713: } catch (Exception e) {
714: e.printStackTrace();
715: }
716: }
717: }
718:
719: void close(boolean waitDeadThread) {
720: synchronized (closeLock) {
721: if (stopConn)
722: return;
723: stopConn = true;
724: }
725:
726: try {
727: closeClientHalf(waitDeadThread, false);
728: closeProxyHalf(waitDeadThread, false);
729: } finally {
730: parent.deregister(this );
731: }
732: }
733: }
734:
735: private static void prompt() {
736: synchronized (System.err) {
737: System.err.print("\nproxy> ");
738: System.err.flush();
739: }
740: }
741:
742: private static void out(String message) {
743: synchronized (System.err) {
744: System.err.println(message);
745: }
746: }
747:
748: private static void out(Throwable t) {
749: if (t == null)
750: return;
751: synchronized (System.err) {
752: t.printStackTrace(System.err);
753: }
754: }
755:
756: private static void log(String message) {
757: log(message, null);
758: }
759:
760: private static void log(String message, Throwable t) {
761: synchronized (System.err) {
762: System.err.println(new Date() + ": " + message);
763: if (t != null) {
764: t.printStackTrace(System.err);
765: }
766: }
767: }
768:
769: private void debug(String message) {
770: debug(message, null);
771: }
772:
773: private void debug(String message, Throwable t) {
774: if (debug)
775: log(message, t);
776: }
777:
778: private static void usage() {
779: System.err
780: .println("usage: TCPProxy <listen port> <endpoint[,endpoint...]> [delay]");
781: System.err
782: .println(" <listen port> - The port the proxy should listen on");
783: System.err
784: .println(" <endpoint> - Comma separated list of 1 or more <host>:<port> pairs to round robin requests to");
785: System.err
786: .println(" [delay] - Millisecond delay between network data (optional, default: 0)");
787: }
788:
789: }
|