001: package ch.ethz.ssh2.transport;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.io.OutputStream;
006: import java.net.InetAddress;
007: import java.net.InetSocketAddress;
008: import java.net.Socket;
009: import java.net.UnknownHostException;
010: import java.security.SecureRandom;
011: import java.util.Vector;
012:
013: import ch.ethz.ssh2.ConnectionInfo;
014: import ch.ethz.ssh2.ConnectionMonitor;
015: import ch.ethz.ssh2.DHGexParameters;
016: import ch.ethz.ssh2.HTTPProxyData;
017: import ch.ethz.ssh2.HTTPProxyException;
018: import ch.ethz.ssh2.ProxyData;
019: import ch.ethz.ssh2.ServerHostKeyVerifier;
020: import ch.ethz.ssh2.crypto.Base64;
021: import ch.ethz.ssh2.crypto.CryptoWishList;
022: import ch.ethz.ssh2.crypto.cipher.BlockCipher;
023: import ch.ethz.ssh2.crypto.digest.MAC;
024: import ch.ethz.ssh2.log.Logger;
025: import ch.ethz.ssh2.packets.PacketDisconnect;
026: import ch.ethz.ssh2.packets.Packets;
027: import ch.ethz.ssh2.packets.TypesReader;
028: import ch.ethz.ssh2.util.Tokenizer;
029:
030: /*
031: * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
032: * packets are allowed during kex exchange, on the other side we need to blindly
033: * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
034: * the next packet is not a channel data packet? Yes, we could check if it is in
035: * the KEX range. But the standard says nothing about this. The OpenSSH guys
036: * block local "normal" traffic during KEX. That's fine - however, they assume
037: * that the other side is doing the same. During re-key, if they receive traffic
038: * other than KEX, they become horribly irritated and kill the connection. Since
039: * we are very likely going to communicate with OpenSSH servers, we have to play
040: * the same game - even though we could do better.
041: *
042: * btw: having stdout and stderr on the same channel, with a shared window, is
043: * also a VERY good idea... =(
044: */
045:
046: /**
047: * TransportManager.
048: *
049: * @author Christian Plattner, plattner@inf.ethz.ch
050: * @version $Id: TransportManager.java,v 1.16 2006/08/11 12:24:00 cplattne Exp $
051: */
052: public class TransportManager {
053: private static final Logger log = Logger
054: .getLogger(TransportManager.class);
055:
056: class HandlerEntry {
057: MessageHandler mh;
058: int low;
059: int high;
060: }
061:
062: private final Vector asynchronousQueue = new Vector();
063: private Thread asynchronousThread = null;
064:
065: class AsynchronousWorker extends Thread {
066: public void run() {
067: while (true) {
068: byte[] msg = null;
069:
070: synchronized (asynchronousQueue) {
071: if (asynchronousQueue.size() == 0) {
072: /* After the queue is empty for about 2 seconds, stop this thread */
073:
074: try {
075: asynchronousQueue.wait(2000);
076: } catch (InterruptedException e) {
077: /* OKOK, if somebody interrupts us, then we may die earlier. */
078: }
079:
080: if (asynchronousQueue.size() == 0) {
081: asynchronousThread = null;
082: return;
083: }
084: }
085:
086: msg = (byte[]) asynchronousQueue.remove(0);
087: }
088:
089: /* The following invocation may throw an IOException.
090: * There is no point in handling it - it simply means
091: * that the connection has a problem and we should stop
092: * sending asynchronously messages. We do not need to signal that
093: * we have exited (asynchronousThread = null): further
094: * messages in the queue cannot be sent by this or any
095: * other thread.
096: * Other threads will sooner or later (when receiving or
097: * sending the next message) get the same IOException and
098: * get to the same conclusion.
099: */
100:
101: try {
102: sendMessage(msg);
103: } catch (IOException e) {
104: return;
105: }
106: }
107: }
108: }
109:
110: String hostname;
111: int port;
112: final Socket sock = new Socket();
113:
114: Object connectionSemaphore = new Object();
115:
116: boolean flagKexOngoing = false;
117: boolean connectionClosed = false;
118:
119: Throwable reasonClosedCause = null;
120:
121: TransportConnection tc;
122: KexManager km;
123:
124: Vector messageHandlers = new Vector();
125:
126: Thread receiveThread;
127:
128: Vector connectionMonitors = new Vector();
129: boolean monitorsWereInformed = false;
130:
131: /**
132: * There were reports that there are JDKs which use
133: * the resolver even though one supplies a dotted IP
134: * address in the Socket constructor. That is why we
135: * try to generate the InetAdress "by hand".
136: *
137: * @param host
138: * @return the InetAddress
139: * @throws UnknownHostException
140: */
141: private InetAddress createInetAddress(String host)
142: throws UnknownHostException {
143: /* Check if it is a dotted IP4 address */
144:
145: InetAddress addr = parseIPv4Address(host);
146:
147: if (addr != null)
148: return addr;
149:
150: return InetAddress.getByName(host);
151: }
152:
153: private InetAddress parseIPv4Address(String host)
154: throws UnknownHostException {
155: if (host == null)
156: return null;
157:
158: String[] quad = Tokenizer.parseTokens(host, '.');
159:
160: if ((quad == null) || (quad.length != 4))
161: return null;
162:
163: byte[] addr = new byte[4];
164:
165: for (int i = 0; i < 4; i++) {
166: int part = 0;
167:
168: if ((quad[i].length() == 0) || (quad[i].length() > 3))
169: return null;
170:
171: for (int k = 0; k < quad[i].length(); k++) {
172: char c = quad[i].charAt(k);
173:
174: /* No, Character.isDigit is not the same */
175: if ((c < '0') || (c > '9'))
176: return null;
177:
178: part = part * 10 + (c - '0');
179: }
180:
181: if (part > 255) /* 300.1.2.3 is invalid =) */
182: return null;
183:
184: addr[i] = (byte) part;
185: }
186:
187: return InetAddress.getByAddress(host, addr);
188: }
189:
190: public TransportManager(String host, int port) throws IOException {
191: this .hostname = host;
192: this .port = port;
193: }
194:
195: public int getPacketOverheadEstimate() {
196: return tc.getPacketOverheadEstimate();
197: }
198:
199: public void setTcpNoDelay(boolean state) throws IOException {
200: sock.setTcpNoDelay(state);
201: }
202:
203: public void setSoTimeout(int timeout) throws IOException {
204: sock.setSoTimeout(timeout);
205: }
206:
207: public ConnectionInfo getConnectionInfo(int kexNumber)
208: throws IOException {
209: return km.getOrWaitForConnectionInfo(kexNumber);
210: }
211:
212: public Throwable getReasonClosedCause() {
213: synchronized (connectionSemaphore) {
214: return reasonClosedCause;
215: }
216: }
217:
218: public byte[] getSessionIdentifier() {
219: return km.sessionId;
220: }
221:
222: public void close(Throwable cause, boolean useDisconnectPacket) {
223: if (useDisconnectPacket == false) {
224: /* OK, hard shutdown - do not aquire the semaphore,
225: * perhaps somebody is inside (and waits until the remote
226: * side is ready to accept new data). */
227:
228: try {
229: sock.close();
230: } catch (IOException ignore) {
231: }
232:
233: /* OK, whoever tried to send data, should now agree that
234: * there is no point in further waiting =)
235: * It is safe now to aquire the semaphore.
236: */
237: }
238:
239: synchronized (connectionSemaphore) {
240: if (connectionClosed == false) {
241: if (useDisconnectPacket == true) {
242: try {
243: byte[] msg = new PacketDisconnect(
244: Packets.SSH_DISCONNECT_BY_APPLICATION,
245: cause.getMessage(), "").getPayload();
246: if (tc != null)
247: tc.sendMessage(msg);
248: } catch (IOException ignore) {
249: }
250:
251: try {
252: sock.close();
253: } catch (IOException ignore) {
254: }
255: }
256:
257: connectionClosed = true;
258: reasonClosedCause = cause; /* may be null */
259: }
260: connectionSemaphore.notifyAll();
261: }
262:
263: /* No check if we need to inform the monitors */
264:
265: Vector monitors = null;
266:
267: synchronized (this ) {
268: /* Short term lock to protect "connectionMonitors"
269: * and "monitorsWereInformed"
270: * (they may be modified concurrently)
271: */
272:
273: if (monitorsWereInformed == false) {
274: monitorsWereInformed = true;
275: monitors = (Vector) connectionMonitors.clone();
276: }
277: }
278:
279: if (monitors != null) {
280: for (int i = 0; i < monitors.size(); i++) {
281: try {
282: ConnectionMonitor cmon = (ConnectionMonitor) monitors
283: .elementAt(i);
284: cmon.connectionLost(reasonClosedCause);
285: } catch (Exception ignore) {
286: }
287: }
288: }
289: }
290:
291: private void establishConnection(ProxyData proxyData,
292: int connectTimeout) throws IOException {
293: /* See the comment for createInetAddress() */
294:
295: if (proxyData == null) {
296: InetAddress addr = createInetAddress(hostname);
297: sock.connect(new InetSocketAddress(addr, port),
298: connectTimeout);
299: sock.setSoTimeout(0);
300: return;
301: }
302:
303: if (proxyData instanceof HTTPProxyData) {
304: HTTPProxyData pd = (HTTPProxyData) proxyData;
305:
306: /* At the moment, we only support HTTP proxies */
307:
308: InetAddress addr = createInetAddress(pd.proxyHost);
309: sock.connect(new InetSocketAddress(addr, pd.proxyPort),
310: connectTimeout);
311: sock.setSoTimeout(0);
312:
313: /* OK, now tell the proxy where we actually want to connect to */
314:
315: StringBuffer sb = new StringBuffer();
316:
317: sb.append("CONNECT ");
318: sb.append(hostname);
319: sb.append(':');
320: sb.append(port);
321: sb.append(" HTTP/1.0\r\n");
322:
323: if ((pd.proxyUser != null) && (pd.proxyPass != null)) {
324: String credentials = pd.proxyUser + ":" + pd.proxyPass;
325: char[] encoded = Base64.encode(credentials.getBytes());
326: sb.append("Proxy-Authorization: Basic ");
327: sb.append(encoded);
328: sb.append("\r\n");
329: }
330:
331: if (pd.requestHeaderLines != null) {
332: for (int i = 0; i < pd.requestHeaderLines.length; i++) {
333: if (pd.requestHeaderLines[i] != null) {
334: sb.append(pd.requestHeaderLines[i]);
335: sb.append("\r\n");
336: }
337: }
338: }
339:
340: sb.append("\r\n");
341:
342: OutputStream out = sock.getOutputStream();
343:
344: out.write(sb.toString().getBytes());
345: out.flush();
346:
347: /* Now parse the HTTP response */
348:
349: byte[] buffer = new byte[1024];
350: InputStream in = sock.getInputStream();
351:
352: int len = ClientServerHello.readLineRN(in, buffer);
353:
354: String httpReponse = new String(buffer, 0, len);
355:
356: if (httpReponse.startsWith("HTTP/") == false)
357: throw new IOException(
358: "The proxy did not send back a valid HTTP response.");
359:
360: /* "HTTP/1.X XYZ X" => 14 characters minimum */
361:
362: if ((httpReponse.length() < 14)
363: || (httpReponse.charAt(8) != ' ')
364: || (httpReponse.charAt(12) != ' '))
365: throw new IOException(
366: "The proxy did not send back a valid HTTP response.");
367:
368: int errorCode = 0;
369:
370: try {
371: errorCode = Integer.parseInt(httpReponse.substring(9,
372: 12));
373: } catch (NumberFormatException ignore) {
374: throw new IOException(
375: "The proxy did not send back a valid HTTP response.");
376: }
377:
378: if ((errorCode < 0) || (errorCode > 999))
379: throw new IOException(
380: "The proxy did not send back a valid HTTP response.");
381:
382: if (errorCode != 200) {
383: throw new HTTPProxyException(httpReponse.substring(13),
384: errorCode);
385: }
386:
387: /* OK, read until empty line */
388:
389: while (true) {
390: len = ClientServerHello.readLineRN(in, buffer);
391: if (len == 0)
392: break;
393: }
394: return;
395: }
396:
397: throw new IOException("Unsupported ProxyData");
398: }
399:
400: public void initialize(CryptoWishList cwl,
401: ServerHostKeyVerifier verifier, DHGexParameters dhgex,
402: int connectTimeout, SecureRandom rnd, ProxyData proxyData)
403: throws IOException {
404: /* First, establish the TCP connection to the SSH-2 server */
405:
406: establishConnection(proxyData, connectTimeout);
407:
408: /* Parse the server line and say hello - important: this information is later needed for the
409: * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
410: * for later use.
411: */
412:
413: ClientServerHello csh = new ClientServerHello(sock
414: .getInputStream(), sock.getOutputStream());
415:
416: tc = new TransportConnection(sock.getInputStream(), sock
417: .getOutputStream(), rnd);
418:
419: km = new KexManager(this , csh, cwl, hostname, port, verifier,
420: rnd);
421: km.initiateKEX(cwl, dhgex);
422:
423: receiveThread = new Thread(new Runnable() {
424: public void run() {
425: try {
426: receiveLoop();
427: } catch (IOException e) {
428: close(e, false);
429:
430: if (log.isEnabled())
431: log.log(10,
432: "Receive thread: error in receiveLoop: "
433: + e.getMessage());
434: }
435:
436: if (log.isEnabled())
437: log
438: .log(50,
439: "Receive thread: back from receiveLoop");
440:
441: /* Tell all handlers that it is time to say goodbye */
442:
443: if (km != null) {
444: try {
445: km.handleMessage(null, 0);
446: } catch (IOException e) {
447: }
448: }
449:
450: for (int i = 0; i < messageHandlers.size(); i++) {
451: HandlerEntry he = (HandlerEntry) messageHandlers
452: .elementAt(i);
453: try {
454: he.mh.handleMessage(null, 0);
455: } catch (Exception ignore) {
456: }
457: }
458: }
459: });
460:
461: receiveThread.setDaemon(true);
462: receiveThread.start();
463: }
464:
465: public void registerMessageHandler(MessageHandler mh, int low,
466: int high) {
467: HandlerEntry he = new HandlerEntry();
468: he.mh = mh;
469: he.low = low;
470: he.high = high;
471:
472: synchronized (messageHandlers) {
473: messageHandlers.addElement(he);
474: }
475: }
476:
477: public void removeMessageHandler(MessageHandler mh, int low,
478: int high) {
479: synchronized (messageHandlers) {
480: for (int i = 0; i < messageHandlers.size(); i++) {
481: HandlerEntry he = (HandlerEntry) messageHandlers
482: .elementAt(i);
483: if ((he.mh == mh) && (he.low == low)
484: && (he.high == high)) {
485: messageHandlers.removeElementAt(i);
486: break;
487: }
488: }
489: }
490: }
491:
492: public void sendKexMessage(byte[] msg) throws IOException {
493: synchronized (connectionSemaphore) {
494: if (connectionClosed) {
495: throw (IOException) new IOException(
496: "Sorry, this connection is closed.")
497: .initCause(reasonClosedCause);
498: }
499:
500: flagKexOngoing = true;
501:
502: try {
503: tc.sendMessage(msg);
504: } catch (IOException e) {
505: close(e, false);
506: throw e;
507: }
508: }
509: }
510:
511: public void kexFinished() throws IOException {
512: synchronized (connectionSemaphore) {
513: flagKexOngoing = false;
514: connectionSemaphore.notifyAll();
515: }
516: }
517:
518: public void forceKeyExchange(CryptoWishList cwl,
519: DHGexParameters dhgex) throws IOException {
520: km.initiateKEX(cwl, dhgex);
521: }
522:
523: public void changeRecvCipher(BlockCipher bc, MAC mac) {
524: tc.changeRecvCipher(bc, mac);
525: }
526:
527: public void changeSendCipher(BlockCipher bc, MAC mac) {
528: tc.changeSendCipher(bc, mac);
529: }
530:
531: public void sendAsynchronousMessage(byte[] msg) throws IOException {
532: synchronized (asynchronousQueue) {
533: asynchronousQueue.addElement(msg);
534:
535: /* This limit should be flexible enough. We need this, otherwise the peer
536: * can flood us with global requests (and other stuff where we have to reply
537: * with an asynchronous message) and (if the server just sends data and does not
538: * read what we send) this will probably put us in a low memory situation
539: * (our send queue would grow and grow and...) */
540:
541: if (asynchronousQueue.size() > 100)
542: throw new IOException(
543: "Error: the peer is not consuming our asynchronous replies.");
544:
545: /* Check if we have an asynchronous sending thread */
546:
547: if (asynchronousThread == null) {
548: asynchronousThread = new AsynchronousWorker();
549: asynchronousThread.setDaemon(true);
550: asynchronousThread.start();
551:
552: /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
553: }
554: }
555: }
556:
557: public void setConnectionMonitors(Vector monitors) {
558: synchronized (this ) {
559: connectionMonitors = (Vector) monitors.clone();
560: }
561: }
562:
563: public void sendMessage(byte[] msg) throws IOException {
564: if (Thread.currentThread() == receiveThread)
565: throw new IOException(
566: "Assertion error: sendMessage may never be invoked by the receiver thread!");
567:
568: synchronized (connectionSemaphore) {
569: while (true) {
570: if (connectionClosed) {
571: throw (IOException) new IOException(
572: "Sorry, this connection is closed.")
573: .initCause(reasonClosedCause);
574: }
575:
576: if (flagKexOngoing == false)
577: break;
578:
579: try {
580: connectionSemaphore.wait();
581: } catch (InterruptedException e) {
582: }
583: }
584:
585: try {
586: tc.sendMessage(msg);
587: } catch (IOException e) {
588: close(e, false);
589: throw e;
590: }
591: }
592: }
593:
594: public void receiveLoop() throws IOException {
595: byte[] msg = new byte[35000];
596:
597: while (true) {
598: int msglen = tc.receiveMessage(msg, 0, msg.length);
599:
600: int type = msg[0] & 0xff;
601:
602: if (type == Packets.SSH_MSG_IGNORE)
603: continue;
604:
605: if (type == Packets.SSH_MSG_DEBUG) {
606: if (log.isEnabled()) {
607: TypesReader tr = new TypesReader(msg, 0, msglen);
608: tr.readByte();
609: tr.readBoolean();
610: StringBuffer debugMessageBuffer = new StringBuffer();
611: debugMessageBuffer.append(tr.readString("UTF-8"));
612:
613: for (int i = 0; i < debugMessageBuffer.length(); i++) {
614: char c = debugMessageBuffer.charAt(i);
615:
616: if ((c >= 32) && (c <= 126))
617: continue;
618: debugMessageBuffer.setCharAt(i, '\uFFFD');
619: }
620:
621: log.log(50, "DEBUG Message from remote: '"
622: + debugMessageBuffer.toString() + "'");
623: }
624: continue;
625: }
626:
627: if (type == Packets.SSH_MSG_UNIMPLEMENTED) {
628: throw new IOException(
629: "Peer sent UNIMPLEMENTED message, that should not happen.");
630: }
631:
632: if (type == Packets.SSH_MSG_DISCONNECT) {
633: TypesReader tr = new TypesReader(msg, 0, msglen);
634: tr.readByte();
635: int reason_code = tr.readUINT32();
636: StringBuffer reasonBuffer = new StringBuffer();
637: reasonBuffer.append(tr.readString("UTF-8"));
638:
639: /*
640: * Do not get fooled by servers that send abnormal long error
641: * messages
642: */
643:
644: if (reasonBuffer.length() > 255) {
645: reasonBuffer.setLength(255);
646: reasonBuffer.setCharAt(254, '.');
647: reasonBuffer.setCharAt(253, '.');
648: reasonBuffer.setCharAt(252, '.');
649: }
650:
651: /*
652: * Also, check that the server did not send charcaters that may
653: * screw up the receiver -> restrict to reasonable US-ASCII
654: * subset -> "printable characters" (ASCII 32 - 126). Replace
655: * all others with 0xFFFD (UNICODE replacement character).
656: */
657:
658: for (int i = 0; i < reasonBuffer.length(); i++) {
659: char c = reasonBuffer.charAt(i);
660:
661: if ((c >= 32) && (c <= 126))
662: continue;
663: reasonBuffer.setCharAt(i, '\uFFFD');
664: }
665:
666: throw new IOException(
667: "Peer sent DISCONNECT message (reason code "
668: + reason_code + "): "
669: + reasonBuffer.toString());
670: }
671:
672: /*
673: * Is it a KEX Packet?
674: */
675:
676: if ((type == Packets.SSH_MSG_KEXINIT)
677: || (type == Packets.SSH_MSG_NEWKEYS)
678: || ((type >= 30) && (type <= 49))) {
679: km.handleMessage(msg, msglen);
680: continue;
681: }
682:
683: MessageHandler mh = null;
684:
685: for (int i = 0; i < messageHandlers.size(); i++) {
686: HandlerEntry he = (HandlerEntry) messageHandlers
687: .elementAt(i);
688: if ((he.low <= type) && (type <= he.high)) {
689: mh = he.mh;
690: break;
691: }
692: }
693:
694: if (mh == null)
695: throw new IOException("Unexpected SSH message (type "
696: + type + ")");
697:
698: mh.handleMessage(msg, msglen);
699: }
700: }
701: }
|