001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.Gnutella;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.lib.aSocket.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031:
032: import java.util.*;
033: import java.io.*;
034: import java.net.*;
035:
036: /**
037: * A GnutellaServer is a SandStorm stage which allows outgoing connections
038: * to be established to the Gnutella network, and accepts incoming
039: * connections. The server has a client sink associated with it, onto which
040: * GnutellaConnection and GnutellaPacket events are pushed.
041: * When a connection is closed, a SinkClosedEvent is pushed, with the
042: * sink pointer set to the GnutellaConnection that closed. If a an
043: * outgoing connection fails, a GnutellaConnectFailedevent is pushed.
044: *
045: * @author Matt Welsh (mdw@cs.berkeley.edu)
046: * @see GnutellaConnection, GnutellaPacket
047: *
048: */
049: public class GnutellaServer implements EventHandlerIF, GnutellaConst {
050:
051: private static final boolean DEBUG = false;
052:
053: private boolean acceptIncoming;
054: private boolean connectUpstream;
055: private String hostname;
056: private int port;
057: private int listenPort;
058:
059: private ATcpServerSocket servsock;
060: private ATcpClientSocket clisock;
061: private ManagerIF mgr;
062: private SinkIF mySink, clientSink;
063:
064: // ATcpConnection -> GnutellaPacketReader
065: private Hashtable readerTable;
066: // ATcpConnection -> GnutellaConnection
067: private Hashtable connTable;
068: // ATcpConnection -> connectionState
069: private Hashtable newConnTable;
070: // InetAddress -> ATcpClientSocket (self)
071: private Hashtable pendingConnTable;
072:
073: private Vector activeConnections;
074:
075: private static int num_svrs;
076: private static byte connectMsg[];
077: private static byte connectReplyMsg[];
078:
079: // Get byte arrays for the handshake messages
080: static {
081: ByteArrayOutputStream baos = new ByteArrayOutputStream();
082: PrintWriter ps = new PrintWriter(baos);
083: ps.print(GNUTELLA_CONNECT);
084: ps.flush();
085: connectMsg = baos.toByteArray();
086: baos = new ByteArrayOutputStream();
087: ps = new PrintWriter(baos);
088: ps.print(GNUTELLA_OK);
089: ps.flush();
090: connectReplyMsg = baos.toByteArray();
091: }
092:
093: /**
094: * Create a Gnutella server listening for incoming connections on
095: * the default port of 6346.
096: */
097: public GnutellaServer(ManagerIF mgr, SinkIF clientSink)
098: throws Exception {
099: this (mgr, clientSink, DEFAULT_GNUTELLA_PORT);
100: }
101:
102: /**
103: * Create a Gnutella server listening for incoming connections on
104: * the given listenPort. If listenPort == 0, no incoming connections
105: * will be accepted. (Outgoing connections can still be established
106: * using openConnection.)
107: */
108: public GnutellaServer(ManagerIF mgr, SinkIF clientSink,
109: int listenPort) throws Exception {
110: this .mgr = mgr;
111: this .clientSink = clientSink;
112: this .listenPort = listenPort;
113:
114: if (listenPort == 0) {
115: acceptIncoming = false;
116: } else {
117: acceptIncoming = true;
118: }
119: this .readerTable = new Hashtable(1);
120: this .connTable = new Hashtable(1);
121: this .newConnTable = new Hashtable(1);
122: this .pendingConnTable = new Hashtable(1);
123: this .activeConnections = new Vector(1);
124:
125: // Create the stage and register it
126: mgr.createStage("GnutellaServer " + num_svrs + " <port "
127: + listenPort + ">", this , null);
128: }
129:
130: public void init(ConfigDataIF config) throws IOException {
131: mySink = config.getStage().getSink();
132:
133: if (connectUpstream) {
134: clisock = new ATcpClientSocket(hostname, port, mySink,
135: WRITE_CLOG_THRESHOLD, -1);
136: }
137:
138: if (acceptIncoming) {
139: servsock = new ATcpServerSocket(listenPort, mySink,
140: WRITE_CLOG_THRESHOLD);
141: }
142: }
143:
144: public void destroy() {
145: }
146:
147: /**
148: * Open a connection to the given hostname and port. When the
149: * connection is established, a GnutellaConnection will be pushed to
150: * this server's client sink.
151: */
152: public void openConnection(String hostname, int port)
153: throws UnknownHostException {
154: if (DEBUG)
155: System.err.println("GnutellaServer: Opening connection to "
156: + hostname + ":" + port);
157: ATcpClientSocket clisock = new ATcpClientSocket(hostname, port,
158: mySink, WRITE_CLOG_THRESHOLD, -1);
159: pendingConnTable.put(clisock, clisock);
160: }
161:
162: /**
163: * Open a connection to the given address and port. When the
164: * connection is established, a GnutellaConnection will be pushed to
165: * this server's client sink.
166: */
167: public void openConnection(InetAddress address, int port) {
168: if (DEBUG)
169: System.err.println("GS: Opening connection to " + address
170: + ":" + port);
171: ATcpClientSocket clisock = new ATcpClientSocket(address, port,
172: mySink, WRITE_CLOG_THRESHOLD, -1);
173: pendingConnTable.put(clisock, clisock);
174: }
175:
176: // Main event handler
177: public void handleEvent(QueueElementIF qel) {
178: if (DEBUG)
179: System.err.println("GnutellaServer got qel: " + qel);
180:
181: if (qel instanceof ATcpInPacket) {
182: ATcpInPacket pkt = (ATcpInPacket) qel;
183: if (newConnTable.get(pkt.getConnection()) != null) {
184: // New connection - handle handshake
185: handleHandshake(pkt);
186: } else {
187: continuePacket((ATcpInPacket) qel);
188: }
189:
190: } else if (qel instanceof ATcpConnection) {
191: ATcpConnection conn = (ATcpConnection) qel;
192: handleIncomingConnection(conn);
193:
194: } else if (qel instanceof aSocketErrorEvent) {
195: System.err.println("GnutellaServer got error: "
196: + qel.toString());
197:
198: if (qel instanceof ATcpConnectFailedEvent) {
199: ATcpConnectFailedEvent failed = (ATcpConnectFailedEvent) qel;
200: pendingConnTable.remove(failed.getSocket());
201: GnutellaConnectFailedEvent cfe = new GnutellaConnectFailedEvent(
202: (ATcpClientSocket) failed.getSocket());
203: clientSink.enqueue_lossy(cfe);
204: }
205:
206: } else if (qel instanceof SinkDrainedEvent) {
207: // Ignore
208:
209: } else if (qel instanceof SinkCloggedEvent) {
210: // Some connection is clogged; tell the user
211: SinkCloggedEvent sce = (SinkCloggedEvent) qel;
212: GnutellaConnection gc = (GnutellaConnection) connTable
213: .get(sce.sink);
214: if (gc != null)
215: clientSink
216: .enqueue_lossy(new SinkCloggedEvent(gc, null));
217:
218: } else if (qel instanceof SinkClosedEvent) {
219: // Some connection closed; tell the user
220: SinkClosedEvent sce = (SinkClosedEvent) qel;
221: GnutellaConnection gc = (GnutellaConnection) connTable
222: .get(sce.sink);
223: if (gc != null)
224: clientSink.enqueue_lossy(new SinkClosedEvent(gc));
225: cleanupConnection((ATcpConnection) sce.sink, gc);
226: }
227: }
228:
229: public void handleEvents(QueueElementIF[] qelarr) {
230: for (int i = 0; i < qelarr.length; i++) {
231: handleEvent(qelarr[i]);
232: }
233: }
234:
235: private void continuePacket(ATcpInPacket pkt) {
236: GnutellaConnection gc = (GnutellaConnection) connTable.get(pkt
237: .getConnection());
238: if (gc == null) {
239: System.err
240: .println("GS: Warning: continuePacket got packet for bad connection: "
241: + pkt);
242: return;
243: }
244: GnutellaPacketReader gpr = gc.getReader();
245:
246: try {
247: gpr.pushPacket(pkt);
248: GnutellaPacket gp = gpr.getGnutellaPacket();
249:
250: // May have multiple GnutellaPackets pending
251: while (gp != null) {
252: if (DEBUG)
253: System.err
254: .println("GnutellaServer: Finished reading packet");
255: gp.setConnection(gc);
256: if (!clientSink.enqueue_lossy(gp)) {
257: //System.err.println("GS: Warning: Cannot enqueue_lossy packet "+gp);
258: }
259:
260: gp = gpr.getGnutellaPacket();
261: }
262:
263: } catch (IOException e) {
264: //System.err.println("GnutellaServer: Got exception reading packet: "+e);
265: // XXX SHould drop packet and close connection
266: return;
267: }
268: }
269:
270: private void handleIncomingConnection(ATcpConnection conn) {
271: if (DEBUG)
272: System.err
273: .println("GnutellaServer: handleIncomingConnection called on "
274: + conn);
275:
276: if (conn.getServerSocket() != null) {
277: // Incoming connection on server socket
278: if (DEBUG)
279: System.err
280: .println("GnutellaServer: new connection on server socket");
281: newConnTable.put(conn, new connectionState(true));
282: } else {
283: // Upstream connection established
284: if (DEBUG)
285: System.err
286: .println("GnutellaServer: upstream connection established");
287: pendingConnTable.remove(conn.getClientSocket());
288: newConnTable.put(conn, new connectionState(false));
289: SinkIF upstream = (SinkIF) conn;
290: // Send the connect message
291:
292: if (DEBUG)
293: System.err.println("GnutellaServer: Sending handshake "
294: + new String(connectMsg));
295: sendBytes(upstream, connectMsg);
296: }
297:
298: // Profile the connection if profiling enabled
299: ProfilerIF profiler = mgr.getProfiler();
300: SandstormConfig cfg = mgr.getConfig();
301: if ((profiler != null)
302: && (cfg.getBoolean("global.profile.sockets")))
303: profiler.add(conn.toString(), conn);
304:
305: if (DEBUG)
306: System.err
307: .println("GnutellaServer: handleIncomingConnection doing startReader");
308: conn.startReader(mySink);
309: }
310:
311: // Inform user of connection
312: private void pushNewConnection(ATcpConnection conn) {
313: GnutellaConnection gc = new GnutellaConnection(this , conn);
314:
315: connTable.put(conn, gc);
316: activeConnections.addElement(gc);
317: if (!clientSink.enqueue_lossy(gc)) {
318: System.err.println("GS: Warning: Cannot enqueue_lossy "
319: + gc);
320: }
321: }
322:
323: void closeConnection(ATcpConnection tcpconn, SinkIF compQ) {
324: try {
325: tcpconn.close(compQ);
326: } catch (SinkClosedException e) {
327: // Ignore
328: }
329: }
330:
331: void cleanupConnection(ATcpConnection tcpconn, GnutellaConnection gc) {
332: readerTable.remove(tcpconn);
333: connTable.remove(tcpconn);
334: newConnTable.remove(tcpconn);
335: if (gc != null)
336: activeConnections.removeElement(gc);
337: }
338:
339: private void handleHandshake(ATcpInPacket pkt) {
340: if (DEBUG)
341: System.err.println("GnutellaServer: handleHandshake for "
342: + pkt + ", conn " + pkt.getConnection());
343: connectionState cs = (connectionState) newConnTable.get(pkt
344: .getConnection());
345: if (DEBUG)
346: System.err.println("GnutellaServer: cs.is_incoming is "
347: + cs.is_incoming);
348:
349: boolean done;
350: try {
351: done = cs.process(pkt);
352: } catch (IOException e) {
353: // Got back packet
354: if (DEBUG)
355: System.err.println("GnutellaServer: got bad handshake");
356: try {
357: pkt.getConnection().close(null);
358: } catch (SinkClosedException sde) {
359: // Ignore
360: }
361: newConnTable.remove(pkt.getConnection());
362: return;
363: }
364: if (done) {
365: if (DEBUG)
366: System.err
367: .println("GnutellaServer: handshake complete");
368:
369: // Finished handshake
370: if (cs.is_incoming) {
371: // Is an incoming connection - got connect message
372: SinkIF sink = (SinkIF) pkt.getConnection();
373: sendBytes(sink, connectReplyMsg);
374: if (DEBUG)
375: System.err
376: .println("GnutellaServer: send connect reply msg");
377: newConnTable.remove(pkt.getConnection());
378: pushNewConnection(pkt.getConnection());
379:
380: } else {
381: // Upstream connection - got the reply message
382: newConnTable.remove(pkt.getConnection());
383: pushNewConnection(pkt.getConnection());
384: }
385: }
386: }
387:
388: private void sendBytes(SinkIF sink, byte msg[]) {
389: BufferElement buf = new BufferElement(msg);
390: try {
391: sink.enqueue(buf);
392: } catch (SinkFullException sfe) {
393: System.err
394: .println("GnutellaServer: Got sink full exception in sendBytes");
395: } catch (SinkException sde) {
396: System.err
397: .println("GnutellaServer: Got sink exception in sendBytes");
398: // XXX MDW: Need to close connection?
399: }
400:
401: }
402:
403: public String toString() {
404: String s = "GnutellaServer ";
405: if (connectUpstream) {
406: s += "[" + hostname + ":" + port + "]";
407: }
408: if (acceptIncoming) {
409: s += "[listen=" + listenPort + "]";
410: }
411: return s;
412: }
413:
414: /**
415: * Register a sink to receive incoming packets on this
416: * connection.
417: */
418: public void registerSink(SinkIF sink) {
419: this .clientSink = sink;
420: }
421:
422: // Return my sink so that GnutellaConnection can redirect
423: // packet completions to it
424: SinkIF getSink() {
425: return mySink;
426: }
427:
428: /**
429: * Send a packet to all nodes but the given node. Useful for packet
430: * routing.
431: */
432: public void sendToAllButOne(GnutellaPacket pkt,
433: GnutellaConnection exclude) {
434:
435: for (int i = 0; i < activeConnections.size(); i++) {
436: GnutellaConnection gc = (GnutellaConnection) activeConnections
437: .elementAt(i);
438: if (!gc.equals(exclude)) {
439: if (!gc.enqueue_lossy(pkt)) {
440: System.err
441: .println("GS: Warning: Could not enqueue_lossy packet to "
442: + gc);
443: }
444: }
445: }
446: }
447:
448: /**
449: * Internal class used to monitor state of connections during
450: * handshake phase
451: */
452: class connectionState {
453: boolean is_incoming;
454: byte barr[];
455: byte target[];
456: int cur_offset, cur_length_target;
457:
458: connectionState(boolean is_incoming) {
459: this .is_incoming = is_incoming;
460: if (is_incoming) {
461: barr = new byte[connectMsg.length];
462: cur_offset = 0;
463: cur_length_target = barr.length;
464: target = connectMsg;
465: } else {
466: barr = new byte[connectReplyMsg.length];
467: cur_offset = 0;
468: cur_length_target = barr.length;
469: target = connectReplyMsg;
470: }
471: }
472:
473: // Process a packet and see if it matches the target
474: boolean process(ATcpInPacket packet) throws IOException {
475: byte in[] = packet.getBytes();
476: if (DEBUG)
477: System.err
478: .println("GnutellaServer: process got bytes: "
479: + new String(in));
480:
481: int c;
482: if (DEBUG)
483: System.err.println("GnutellaServer: in.length="
484: + in.length + ", cur_off=" + cur_offset
485: + ", lt=" + cur_length_target);
486:
487: if (in.length < cur_length_target - cur_offset) {
488: c = in.length;
489: } else {
490: c = cur_length_target - cur_offset;
491: }
492: System.arraycopy(in, 0, barr, cur_offset, c);
493: cur_offset += c;
494:
495: if (cur_offset == cur_length_target) {
496: boolean match = true;
497: for (int i = 0; i < barr.length; i++) {
498: if (barr[i] != target[i])
499: match = false;
500: }
501: if (match)
502: return true;
503: else
504: throw new IOException(
505: "process got bad handshake packet");
506: }
507:
508: return false;
509: }
510:
511: }
512:
513: }
|