0001: // serverCore.java
0002: // -------------------------------------------
0003: // (C) by Michael Peter Christen; mc@anomic.de
0004: // first published on http://www.anomic.de
0005: // Frankfurt, Germany, 2002-2004
0006: //
0007: // $LastChangedDate: 2008-01-06 19:23:38 +0000 (So, 06 Jan 2008) $
0008: // $LastChangedRevision: 4305 $
0009: // $LastChangedBy: orbiter $
0010: //
0011: // ThreadPool
0012: //
0013: // This program is free software; you can redistribute it and/or modify
0014: // it under the terms of the GNU General Public License as published by
0015: // the Free Software Foundation; either version 2 of the License, or
0016: // (at your option) any later version.
0017: //
0018: // This program is distributed in the hope that it will be useful,
0019: // but WITHOUT ANY WARRANTY; without even the implied warranty of
0020: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0021: // GNU General Public License for more details.
0022: //
0023: // You should have received a copy of the GNU General Public License
0024: // along with this program; if not, write to the Free Software
0025: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0026: //
0027: // Using this software in any meaning (reading, learning, copying, compiling,
0028: // running) means that you agree that the Author(s) is (are) not responsible
0029: // for cost, loss of data or any harm that may be caused directly or indirectly
0030: // by usage of this softare or this documentation. The usage of this software
0031: // is on your own risk. The installation and usage (starting/running) of this
0032: // software may allow other people or application to access your computer and
0033: // any attached devices and is highly dependent on the configuration of the
0034: // software which must be done by the user of the software; the author(s) is
0035: // (are) also not responsible for proper configuration and usage of the
0036: // software, even if provoked by documentation provided together with
0037: // the software.
0038: //
0039: // Any changes to this file according to the GPL as documented in the file
0040: // gpl.txt aside this file in the shipment you received can be done to the
0041: // lines that follows this copyright notice here, but changes must not be
0042: // done inside the copyright notive above. A re-distribution must contain
0043: // the intact and unchanged copyright notice.
0044: // Contributions and changes to the program code must be marked as such.
0045:
0046: package de.anomic.server;
0047:
0048: // standard server
0049: import java.io.FileInputStream;
0050: import java.io.FileOutputStream;
0051: import java.io.IOException;
0052: import java.io.InputStream;
0053: import java.io.OutputStream;
0054: import java.io.PushbackInputStream;
0055: import java.lang.reflect.InvocationTargetException;
0056: import java.lang.reflect.Method;
0057: import java.net.Inet4Address;
0058: import java.net.InetAddress;
0059: import java.net.InetSocketAddress;
0060: import java.net.NetworkInterface;
0061: import java.net.ServerSocket;
0062: import java.net.Socket;
0063: import java.net.SocketException;
0064: import java.nio.channels.ClosedByInterruptException;
0065: import java.security.KeyStore;
0066: import java.util.Enumeration;
0067: import java.util.HashMap;
0068: import java.util.HashSet;
0069: import java.util.Iterator;
0070:
0071: import javax.net.ssl.HandshakeCompletedEvent;
0072: import javax.net.ssl.HandshakeCompletedListener;
0073: import javax.net.ssl.KeyManagerFactory;
0074: import javax.net.ssl.SSLContext;
0075: import javax.net.ssl.SSLSocket;
0076: import javax.net.ssl.SSLSocketFactory;
0077:
0078: import de.anomic.icap.icapd;
0079: import de.anomic.server.logging.serverLog;
0080: import de.anomic.server.portForwarding.serverPortForwarding;
0081: import de.anomic.tools.PKCS12Tool;
0082: import de.anomic.urlRedirector.urlRedirectord;
0083: import de.anomic.yacy.yacyCore;
0084: import de.anomic.yacy.yacySeed;
0085:
0086: public final class serverCore extends serverAbstractThread implements
0087: serverThread {
0088:
0089: // special ASCII codes used for protocol handling
0090: public static final byte HT = 9; // Horizontal Tab
0091: public static final byte LF = 10; // Line Feed
0092: public static final byte CR = 13; // Carriage Return
0093: public static final byte SP = 32; // Space
0094: public static final byte[] CRLF = { CR, LF }; // Line End of HTTP/ICAP headers
0095: public static final String CRLF_STRING = new String(CRLF);
0096: public static final String LF_STRING = new String(new byte[] { LF });
0097: public static final Class<?>[] stringType = { "".getClass() }; // set up some reflection
0098: public static final long startupTime = System.currentTimeMillis();
0099: public static final ThreadGroup sessionThreadGroup = new ThreadGroup(
0100: "sessionThreadGroup");
0101: private static int sessionCounter = 0; // will be increased with each session and is used to return a hash code
0102:
0103: // static variables
0104: public static final Boolean TERMINATE_CONNECTION = Boolean.FALSE;
0105: public static final Boolean RESUME_CONNECTION = Boolean.TRUE;
0106: public static HashMap<String, Integer> bfHost = new HashMap<String, Integer>(); // for brute-force prevention
0107:
0108: // class variables
0109: private String extendedPort; // the port, which is visible from outside (in most cases bind-port)
0110: private String bindPort; // if set, yacy will bind to this port, but set extendedPort in the seed
0111: public boolean forceRestart = false; // specifies if the server should try to do a restart
0112:
0113: public static boolean portForwardingEnabled = false;
0114: public static boolean useStaticIP = false;
0115: public static serverPortForwarding portForwarding = null;
0116:
0117: private SSLSocketFactory sslSocketFactory = null;
0118: private ServerSocket socket; // listener
0119: serverLog log; // log object
0120: private int timeout; // connection time-out of the socket
0121: serverHandler handlerPrototype; // the command class (a serverHandler)
0122:
0123: private serverSwitch switchboard; // the command class switchboard
0124: HashMap<String, String> denyHost;
0125: int commandMaxLength;
0126: private int maxBusySessions;
0127: private HashSet<Session> busySessions;
0128:
0129: /*
0130: private static ServerSocketFactory getServerSocketFactory(boolean dflt, File keyfile, String passphrase) {
0131: // see doc's at
0132: // http://java.sun.com/developer/technicalArticles/Security/secureinternet/
0133: if (dflt) {
0134: return ServerSocketFactory.getDefault();
0135: } else {
0136: SSLServerSocketFactory ssf = null;
0137: try {
0138: // set up key manager to do server authentication
0139: SSLContext ctx;
0140: KeyManagerFactory kmf;
0141: KeyStore ks;
0142: char[] pp = passphrase.toCharArray();
0143:
0144: // open keystore
0145: ks = KeyStore.getInstance("JKS");
0146: ks.load(new FileInputStream(keyfile), pp);
0147:
0148: // get a KeyManager Factory
0149: String algorithm = KeyManagerFactory.getDefaultAlgorithm(); // should be "SunX509"
0150: kmf = KeyManagerFactory.getInstance(algorithm);
0151: kmf.init(ks, pp);
0152:
0153: // create a ssl context with the keyManager Factory
0154: //ctx = SSLContext.getInstance("TLS");
0155: ctx = SSLContext.getInstance("SSLv3");
0156:
0157: ctx.init(kmf.getKeyManagers(), null, null);
0158:
0159: ssf = ctx.getServerSocketFactory();
0160: return ssf;
0161: } catch (Exception e) {
0162: e.printStackTrace();
0163: return null;
0164: }
0165: }
0166: }
0167: */
0168:
0169: public static String clientAddress(Socket s) {
0170: InetAddress uAddr = s.getInetAddress();
0171: if (uAddr.isAnyLocalAddress())
0172: return "localhost";
0173: String cIP = uAddr.getHostAddress();
0174: if (cIP.equals("0:0:0:0:0:0:0:1"))
0175: cIP = "localhost";
0176: if (cIP.equals("127.0.0.1"))
0177: cIP = "localhost";
0178: return cIP;
0179: }
0180:
0181: // class initializer
0182: public serverCore(int timeout, boolean blockAttack,
0183: serverHandler handlerPrototype, serverSwitch switchboard,
0184: int commandMaxLength) {
0185: this .timeout = timeout;
0186:
0187: this .commandMaxLength = commandMaxLength;
0188: this .denyHost = (blockAttack) ? new HashMap<String, String>()
0189: : null;
0190: this .handlerPrototype = handlerPrototype;
0191: this .switchboard = switchboard;
0192:
0193: // initialize logger
0194: this .log = new serverLog("SERVER");
0195:
0196: // init the ssl socket factory
0197: this .sslSocketFactory = initSSLFactory();
0198:
0199: // init session parameter
0200: maxBusySessions = Integer.valueOf(
0201: switchboard.getConfig("httpdMaxBusySessions", "100"))
0202: .intValue();
0203: busySessions = new HashSet<Session>();
0204:
0205: // init servercore
0206: init();
0207: }
0208:
0209: public boolean withSSL() {
0210: return this .sslSocketFactory != null;
0211: }
0212:
0213: public synchronized void init() {
0214: this .log.logInfo("Initializing serverCore ...");
0215:
0216: // read some config values
0217: this .extendedPort = this .switchboard.getConfig("port", "8080")
0218: .trim();
0219: this .bindPort = this .switchboard.getConfig("bindPort", "")
0220: .trim();
0221:
0222: // Open a new server-socket channel
0223: try {
0224: // bind the ServerSocket to a specific address
0225: // InetSocketAddress bindAddress = null;
0226: this .socket = new ServerSocket();
0227: if (bindPort == null || bindPort.equals("")) {
0228: this .log.logInfo("Trying to bind server to port "
0229: + extendedPort);
0230: this .socket
0231: .bind(/*bindAddress = */generateSocketAddress(extendedPort));
0232: } else { //bindPort set, use another port to bind than the port reachable from outside
0233: this .log.logInfo("Trying to bind server to port "
0234: + bindPort + " with " + extendedPort
0235: + "as seedPort.");
0236: this .socket
0237: .bind(/*bindAddress = */generateSocketAddress(bindPort));
0238: }
0239:
0240: // updating the port information
0241: //yacyCore.seedDB.mySeed.put(yacySeed.PORT,Integer.toString(bindAddress.getPort()));
0242: yacyCore.seedDB.mySeed().put(yacySeed.PORT, extendedPort);
0243: } catch (Exception e) {
0244: String errorMsg = "FATAL ERROR: "
0245: + e.getMessage()
0246: + " - probably root access rights needed. check port number";
0247: this .log.logSevere(errorMsg);
0248: System.out.println(errorMsg);
0249: System.exit(0);
0250: }
0251:
0252: // init port forwarding
0253: try {
0254: this .initPortForwarding();
0255: } catch (Exception e) {
0256: this .log.logSevere(
0257: "Unable to initialize server port forwarding.", e);
0258: this .switchboard
0259: .setConfig("portForwardingEnabled", "false");
0260: } catch (Error e) {
0261: this .log.logSevere(
0262: "Unable to initialize server port forwarding.", e);
0263: this .switchboard
0264: .setConfig("portForwardingEnabled", "false");
0265: }
0266:
0267: }
0268:
0269: public static int getPortNr(String extendedPortString) {
0270: int pos = -1;
0271: if ((pos = extendedPortString.indexOf(":")) != -1) {
0272: extendedPortString = extendedPortString.substring(pos + 1);
0273: }
0274: return Integer.parseInt(extendedPortString);
0275: }
0276:
0277: public InetSocketAddress generateSocketAddress(
0278: String extendedPortString) throws SocketException {
0279:
0280: // parsing the port configuration
0281: String bindIP = null;
0282: int bindPort;
0283:
0284: int pos = -1;
0285: if ((pos = extendedPortString.indexOf(":")) != -1) {
0286: bindIP = extendedPortString.substring(0, pos).trim();
0287: extendedPortString = extendedPortString.substring(pos + 1);
0288:
0289: if (bindIP.startsWith("#")) {
0290: String interfaceName = bindIP.substring(1);
0291: String hostName = null;
0292: this .log
0293: .logFine("Trying to determine IP address of interface '"
0294: + interfaceName + "'.");
0295:
0296: Enumeration<NetworkInterface> interfaces = NetworkInterface
0297: .getNetworkInterfaces();
0298: if (interfaces != null) {
0299: while (interfaces.hasMoreElements()) {
0300: NetworkInterface interf = interfaces
0301: .nextElement();
0302: if (interf.getName().equalsIgnoreCase(
0303: interfaceName)) {
0304: Enumeration<InetAddress> addresses = interf
0305: .getInetAddresses();
0306: if (addresses != null) {
0307: while (addresses.hasMoreElements()) {
0308: InetAddress address = (InetAddress) addresses
0309: .nextElement();
0310: if (address instanceof Inet4Address) {
0311: hostName = address
0312: .getHostAddress();
0313: break;
0314: }
0315: }
0316: }
0317: }
0318: }
0319: }
0320: if (hostName == null) {
0321: this .log
0322: .logWarning("Unable to find interface with name '"
0323: + interfaceName
0324: + "'. Binding server to all interfaces");
0325: bindIP = null;
0326: } else {
0327: this .log.logInfo("Binding server to interface '"
0328: + interfaceName + "' with IP '" + hostName
0329: + "'.");
0330: bindIP = hostName;
0331: }
0332: }
0333: }
0334: bindPort = Integer.parseInt(extendedPortString);
0335:
0336: return (bindIP == null) ? new InetSocketAddress(bindPort)
0337: : new InetSocketAddress(bindIP, bindPort);
0338: }
0339:
0340: public void initPortForwarding() throws Exception {
0341: // doing the port forwarding stuff
0342: if (this .switchboard.getConfigBool("portForwarding.Enabled",
0343: false)) {
0344: this .log.logInfo("Initializing port forwarding ...");
0345: try {
0346: // getting the port forwarding type to use
0347: String forwardingType = this .switchboard.getConfig(
0348: "portForwarding.Type", "none");
0349:
0350: // loading port forwarding class
0351: this .log
0352: .logInfo("Trying to load port forwarding class for forwarding type '"
0353: + forwardingType + "'.");
0354: String forwardingClass = this .switchboard.getConfig(
0355: "portForwarding." + forwardingType, "");
0356:
0357: Class<?> forwarderClass = Class
0358: .forName(forwardingClass);
0359: serverCore.portForwarding = (serverPortForwarding) forwarderClass
0360: .newInstance();
0361:
0362: // initializing port forwarding
0363: String localHost = this .socket.getInetAddress()
0364: .getHostName();
0365: Integer localPort = new Integer(this .socket
0366: .getLocalPort());
0367:
0368: serverCore.portForwarding.init(this .switchboard,
0369: localHost, localPort.intValue());
0370:
0371: // connection to port forwarding host
0372: serverCore.portForwarding.connect();
0373:
0374: serverCore.portForwardingEnabled = true;
0375: yacyCore.seedDB.mySeed().put(yacySeed.IP,
0376: serverDomains.myPublicIP());
0377: yacyCore.seedDB.mySeed().put(
0378: yacySeed.PORT,
0379: Integer.toString(serverCore.portForwarding
0380: .getPort()));
0381: } catch (Exception e) {
0382: serverCore.portForwardingEnabled = false;
0383: this .switchboard.setConfig("portForwarding.Enabled",
0384: "false");
0385: throw e;
0386: } catch (Error e) {
0387: serverCore.portForwardingEnabled = false;
0388: this .switchboard.setConfig("portForwarding.Enabled",
0389: "false");
0390: throw e;
0391: }
0392:
0393: } else {
0394: serverCore.portForwardingEnabled = false;
0395: serverCore.portForwarding = null;
0396: yacyCore.seedDB.mySeed().put(yacySeed.IP,
0397: serverDomains.myPublicIP());
0398: yacyCore.seedDB.mySeed().put(
0399: yacySeed.PORT,
0400: Integer.toString(serverCore
0401: .getPortNr(this .switchboard.getConfig(
0402: "port", "8080"))));
0403: }
0404: if (!this .switchboard.getConfig("staticIP", "").equals(""))
0405: serverCore.useStaticIP = true;
0406:
0407: }
0408:
0409: public void open() {
0410: this .log.logConfig("* server started on "
0411: + serverDomains.myPublicLocalIP() + ":"
0412: + this .extendedPort);
0413: }
0414:
0415: public void freemem() {
0416: // FIXME: can we something here to flush memory? Idea: Reduce the size of some of our various caches.
0417: serverMemory.gc(2000, "serverCore.freemem()"); // thq
0418: }
0419:
0420: // class body
0421: public boolean job() throws Exception {
0422: try {
0423: // prepare for new connection
0424: // idleThreadCheck();
0425: this .switchboard.handleBusyState(this .busySessions.size());
0426: this .log.logFinest("* waiting for connections, "
0427: + this .busySessions.size() + " sessions running");
0428:
0429: announceThreadBlockApply();
0430:
0431: // wait for new connection
0432: Socket controlSocket = this .socket.accept();
0433:
0434: // wrap this socket
0435: if (this .sslSocketFactory != null) {
0436: controlSocket = new serverCoreSocket(controlSocket);
0437:
0438: // if the current connection is SSL we need to do a handshake
0439: if (((serverCoreSocket) controlSocket).isSSL()) {
0440: controlSocket = negotiateSSL(controlSocket);
0441: }
0442: }
0443:
0444: announceThreadBlockRelease();
0445:
0446: String cIP = clientAddress(controlSocket);
0447: //System.out.println("server bfHosts=" + bfHost.toString());
0448: if (bfHost.get(cIP) != null) {
0449: Integer attempts = (Integer) bfHost.get(cIP);
0450: if (attempts == null)
0451: attempts = new Integer(1);
0452: else
0453: attempts = new Integer(attempts.intValue() + 1);
0454: bfHost.put(cIP, attempts);
0455: this .log
0456: .logWarning("SLOWING DOWN ACCESS FOR BRUTE-FORCE PREVENTION FROM "
0457: + cIP
0458: + ", ATTEMPT "
0459: + attempts.intValue());
0460: // add a delay to make brute-force harder
0461: announceThreadBlockApply();
0462: try {
0463: Thread.sleep(attempts.intValue() * 2000);
0464: } catch (InterruptedException e) {
0465: }
0466: announceThreadBlockRelease();
0467: if ((attempts.intValue() >= 10)
0468: && (this .denyHost != null)) {
0469: this .denyHost.put(cIP, "deny");
0470: }
0471: }
0472:
0473: if ((this .denyHost == null)
0474: || (this .denyHost.get(cIP) == null)) {
0475: // setting the timeout properly
0476: controlSocket.setSoTimeout(this .timeout);
0477:
0478: // create session
0479: Session connection = new Session(sessionThreadGroup,
0480: controlSocket, this .timeout);
0481: this .busySessions.add(connection);
0482: } else {
0483: this .log.logWarning("ACCESS FROM " + cIP + " DENIED");
0484: }
0485:
0486: return true;
0487: } catch (SocketException e) {
0488: if (this .forceRestart) {
0489: // reinitialize serverCore
0490: init();
0491: this .forceRestart = false;
0492: return true;
0493: }
0494: throw e;
0495: }
0496: }
0497:
0498: public synchronized void close() {
0499: // consuming the isInterrupted Flag. Otherwise we could not properly close the session pool
0500: Thread.interrupted();
0501:
0502: // closing the port forwarding channel
0503: if ((portForwardingEnabled) && (portForwarding != null)) {
0504: try {
0505: this .log.logInfo("Shutdown port forwarding ...");
0506: portForwarding.disconnect();
0507: portForwardingEnabled = false;
0508: portForwarding = null;
0509: } catch (Exception e) {
0510: this .log
0511: .logWarning("Unable to shutdown the port forwarding channel.");
0512: }
0513: }
0514:
0515: // close the serverchannel and socket
0516: try {
0517: this .log.logInfo("Closing server socket ...");
0518: this .socket.close();
0519: } catch (Exception e) {
0520: this .log.logWarning("Unable to close the server socket.");
0521: }
0522:
0523: // close all sessions
0524: this .log.logInfo("Closing server sessions ...");
0525: Iterator<Session> i = this .busySessions.iterator();
0526: Session s;
0527: while (i.hasNext()) {
0528: s = i.next();
0529: s.interrupt();
0530: s.close();
0531: }
0532: this .busySessions = null;
0533:
0534: this .log.logConfig("* terminated");
0535: }
0536:
0537: public int getJobCount() {
0538: return this .busySessions.size();
0539: }
0540:
0541: public int getMaxSessionCount() {
0542: return this .maxBusySessions;
0543: }
0544:
0545: public void setMaxSessionCount(int count) {
0546: this .maxBusySessions = count;
0547: }
0548:
0549: // idle sensor: the thread is idle if there are no sessions running
0550: public boolean idle() {
0551: // idleThreadCheck();
0552: return (this .busySessions.size() == 0);
0553: }
0554:
0555: public final class Session extends Thread {
0556:
0557: boolean destroyed = false;
0558: private boolean running = false;
0559: private boolean stopped = false;
0560:
0561: private long start; // startup time
0562: private serverHandler commandObj;
0563: private HashMap<String, Object> commandObjMethodCache = new HashMap<String, Object>(
0564: 5);
0565:
0566: private String request; // current command line
0567: private int commandCounter; // for logging: number of commands in this session
0568: private String identity; // a string that identifies the client (i.e. ftp: account name)
0569: //private boolean promiscuous; // if true, no lines are read and streams are only passed
0570:
0571: public Socket controlSocket; // dialog socket
0572: public InetAddress userAddress; // the address of the client
0573: public int userPort; // the ip port used by the client
0574: public PushbackInputStream in; // on control input stream
0575: public OutputStream out; // on control output stream, auto-flush
0576: public int socketTimeout;
0577: public int hashIndex;
0578:
0579: public Session(ThreadGroup theThreadGroup,
0580: Socket controlSocket, int socketTimeout) {
0581: super (theThreadGroup, controlSocket.getInetAddress()
0582: .toString()
0583: + "@" + Long.toString(System.currentTimeMillis()));
0584: this .socketTimeout = socketTimeout;
0585: this .controlSocket = controlSocket;
0586: this .hashIndex = sessionCounter;
0587: sessionCounter++;
0588:
0589: if (!this .running) {
0590: // this.setDaemon(true);
0591: this .start();
0592: } else {
0593: this .notifyAll();
0594: }
0595: }
0596:
0597: public int hashCode() {
0598: // return a hash code so it is possible to store objects of httpc objects in a HashSet
0599: return this .hashIndex;
0600: }
0601:
0602: public int getCommandCount() {
0603: return this .commandCounter;
0604: }
0605:
0606: public String getCommandLine() {
0607: return this .request;
0608: }
0609:
0610: public serverHandler getCommandObj() {
0611: return this .commandObj;
0612: }
0613:
0614: public InetAddress getUserAddress() {
0615: return this .userAddress;
0616: }
0617:
0618: public int getUserPort() {
0619: return this .userPort;
0620: }
0621:
0622: public void setStopped(boolean stopped) {
0623: this .stopped = stopped;
0624: }
0625:
0626: public boolean isStopped() {
0627: return this .stopped;
0628: }
0629:
0630: public void close() {
0631: if (this .isAlive()) {
0632: try {
0633: // closing the socket to the client
0634: if ((this .controlSocket != null)
0635: && (this .controlSocket.isConnected())) {
0636: this .controlSocket.close();
0637: serverCore.this .log
0638: .logInfo("Closing main socket of thread '"
0639: + this .getName() + "'");
0640: }
0641: } catch (Exception e) {
0642: }
0643: }
0644: }
0645:
0646: public long getRequestStartTime() {
0647: return this .start;
0648: }
0649:
0650: public long getTime() {
0651: return System.currentTimeMillis() - this .start;
0652: }
0653:
0654: public void setIdentity(String id) {
0655: this .identity = id;
0656: }
0657:
0658: /*
0659: public void setPromiscuous() {
0660: this.promiscuous = true;
0661: }
0662: */
0663:
0664: public void log(boolean outgoing, String request) {
0665: serverCore.this .log.logFine(this .userAddress
0666: .getHostAddress()
0667: + "/"
0668: + this .identity
0669: + " "
0670: + "["
0671: + ((busySessions == null) ? -1 : busySessions
0672: .size())
0673: + ", "
0674: + this .commandCounter
0675: + ((outgoing) ? "] > " : "] < ") + request);
0676: }
0677:
0678: public void writeLine(String messg) throws IOException {
0679: send(this .out, messg + CRLF_STRING);
0680: log(true, messg);
0681: }
0682:
0683: public byte[] readLine() {
0684: return receive(this .in, serverCore.this .commandMaxLength,
0685: false);
0686: }
0687:
0688: /**
0689: * reads a line from the input socket
0690: * this function is provided by the server through a passed method on initialization
0691: * @return the next requestline as string
0692: */
0693: public String readLineAsString() {
0694: byte[] l = readLine();
0695: return (l == null) ? null : new String(l);
0696: }
0697:
0698: /**
0699: * @return whether the {@link Thread} is currently running
0700: */
0701: public boolean isRunning() {
0702: return this .running;
0703: }
0704:
0705: /**
0706: *
0707: *
0708: * @see java.lang.Thread#run()
0709: */
0710: public void run() {
0711: this .running = true;
0712:
0713: try {
0714: // setting the session startup time
0715: this .start = System.currentTimeMillis();
0716:
0717: // set the session identity
0718: this .identity = "-";
0719:
0720: // getting some client information
0721: this .userAddress = this .controlSocket.getInetAddress();
0722: this .userPort = this .controlSocket.getPort();
0723: this .setName("Session_"
0724: + this .userAddress.getHostAddress() + ":"
0725: + this .controlSocket.getPort());
0726:
0727: // TODO: check if we want to allow this socket to connect us
0728:
0729: // getting input and output stream for communication with client
0730: if (this .controlSocket.getInputStream() instanceof PushbackInputStream) {
0731: this .in = (PushbackInputStream) this .controlSocket
0732: .getInputStream();
0733: } else {
0734: this .in = new PushbackInputStream(
0735: this .controlSocket.getInputStream());
0736: }
0737: this .out = this .controlSocket.getOutputStream();
0738:
0739: // reseting the command counter
0740: this .commandCounter = 0;
0741:
0742: // listen for commands
0743: listen();
0744: } catch (Exception e) {
0745: System.err.println("ERROR: (internal) " + e);
0746: } finally {
0747: try {
0748: if (this .controlSocket.isClosed())
0749: return;
0750:
0751: // flush data
0752: this .out.flush();
0753:
0754: // maybe this doesn't work for all SSL socket implementations
0755: if (!(this .controlSocket instanceof SSLSocket)) {
0756: this .controlSocket.shutdownInput();
0757: this .controlSocket.shutdownOutput();
0758: }
0759:
0760: // close streams
0761: this .in.close();
0762: this .out.close();
0763:
0764: // sleep for a while
0765: try {
0766: Thread.sleep(1000);
0767: } catch (InterruptedException e) {
0768: }
0769:
0770: // close everything
0771: this .controlSocket.close();
0772: this .controlSocket = null;
0773:
0774: } catch (IOException e) {
0775: e.printStackTrace();
0776: }
0777: busySessions.remove(this );
0778: }
0779:
0780: }
0781:
0782: private void listen() {
0783: try {
0784:
0785: Object result;
0786: // // send greeting
0787: // Object result = commandObj.greeting();
0788: // if (result != null) {
0789: // if ((result instanceof String) && (((String) result).length() > 0)) writeLine((String) result);
0790: // }
0791:
0792: // start dialog
0793: byte[] requestBytes = null;
0794: boolean terminate = false;
0795: String reqCmd;
0796: String reqProtocol = "HTTP";
0797: Object[] stringParameter = new String[1];
0798: while ((this .in != null)
0799: && ((requestBytes = readLine()) != null)) {
0800: this .setName("Session_"
0801: + this .userAddress.getHostAddress() + ":"
0802: + this .controlSocket.getPort() + "#"
0803: + this .commandCounter);
0804:
0805: this .request = new String(requestBytes);
0806: //log.logDebug("* session " + handle + " received command '" + request + "'. time = " + (System.currentTimeMillis() - handle));
0807: log(false, this .request);
0808: try {
0809: // if we can not determine the proper command string we try to call function emptyRequest
0810: // of the commandObject
0811: if (this .request.trim().length() == 0)
0812: this .request = "EMPTY";
0813:
0814: // getting the rest of the request parameters
0815: int pos = this .request.indexOf(' ');
0816: if (pos < 0) {
0817: reqCmd = this .request.trim().toUpperCase();
0818: stringParameter[0] = "";
0819: } else {
0820: reqCmd = this .request.substring(0, pos)
0821: .trim().toUpperCase();
0822: stringParameter[0] = this .request
0823: .substring(pos).trim();
0824: }
0825:
0826: // now we need to initialize the session
0827: if (this .commandCounter == 0) {
0828: // first we need to determine the proper protocol handler
0829: if (this .request.indexOf("ICAP") >= 0)
0830: reqProtocol = "ICAP";
0831: else if (this .request
0832: .startsWith("REDIRECTOR"))
0833: reqProtocol = "REDIRECTOR";
0834: else
0835: reqProtocol = "HTTP";
0836:
0837: // next we need to get the proper protocol handler
0838: if (reqProtocol.equals("ICAP")) {
0839: this .commandObj = new icapd();
0840: } else if (reqProtocol.equals("REDIRECTOR")) {
0841: this .commandObj = new urlRedirectord();
0842: } else {
0843: // if ((this.commandObj != null) &&
0844: // (this.commandObj.getClass().getName().equals(serverCore.this.handlerPrototype.getClass().getName()))) {
0845: // this.commandObj.reset();
0846: // } else {
0847: // this.commandObj = (serverHandler) serverCore.this.handlerPrototype.clone();
0848: // }
0849:
0850: this .commandObj = (serverHandler) serverCore.this .handlerPrototype
0851: .clone();
0852: }
0853:
0854: // initializing the session
0855: this .commandObj.initSession(this );
0856: }
0857:
0858: // count the amount of requests that were processed by this session until yet
0859: this .commandCounter++;
0860:
0861: // setting the socket timeout for reading of the request content
0862: this .controlSocket
0863: .setSoTimeout(this .socketTimeout);
0864:
0865: // exec command and return value
0866: Object commandMethod = this .commandObjMethodCache
0867: .get(reqProtocol + "_" + reqCmd);
0868: if (commandMethod == null) {
0869: try {
0870: commandMethod = this .commandObj
0871: .getClass().getMethod(reqCmd,
0872: stringType);
0873: this .commandObjMethodCache.put(
0874: reqProtocol + "_" + reqCmd,
0875: commandMethod);
0876: } catch (NoSuchMethodException noMethod) {
0877: commandMethod = this .commandObj
0878: .getClass().getMethod(
0879: "UNKNOWN", stringType);
0880: stringParameter[0] = this .request
0881: .trim();
0882: }
0883: }
0884: //long commandStart = System.currentTimeMillis();
0885: result = ((Method) commandMethod).invoke(
0886: this .commandObj, stringParameter);
0887: //announceMoreExecTime(commandStart - System.currentTimeMillis()); // shall be negative!
0888: //log.logDebug("* session " + handle + " completed command '" + request + "'. time = " + (System.currentTimeMillis() - handle));
0889: this .out.flush();
0890: if (result == null) {
0891: /*
0892: log(2, true, "(NULL RETURNED/STREAM PASSED)");
0893: */
0894: } else if (result instanceof Boolean) {
0895: if (((Boolean) result)
0896: .equals(TERMINATE_CONNECTION))
0897: break;
0898:
0899: /*
0900: * setting timeout to a very high level.
0901: * this is needed because of persistent connection
0902: * support.
0903: */
0904: if (!this .controlSocket.isClosed())
0905: this .controlSocket
0906: .setSoTimeout(30 * 60 * 1000);
0907: } else if (result instanceof String) {
0908: if (((String) result).startsWith("!")) {
0909: result = ((String) result).substring(1);
0910: terminate = true;
0911: }
0912: writeLine((String) result);
0913: } else if (result instanceof InputStream) {
0914: String tmp = send(this .out,
0915: (InputStream) result);
0916: if ((tmp.length() > 4)
0917: && (tmp.toUpperCase()
0918: .startsWith("PASS"))) {
0919: log(true, "PASS ********");
0920: } else {
0921: log(true, tmp);
0922: }
0923: tmp = null;
0924: }
0925: if (terminate)
0926: break;
0927:
0928: } catch (InvocationTargetException ite) {
0929: System.out.println("ERROR A "
0930: + this .userAddress.getHostAddress());
0931: // we extract a target exception and let the thread survive
0932: writeLine(this .commandObj.error(ite
0933: .getTargetException()));
0934: } catch (NoSuchMethodException nsme) {
0935: System.out.println("ERROR B "
0936: + this .userAddress.getHostAddress());
0937: if (!this .userAddress.isSiteLocalAddress()) {
0938: if (serverCore.this .denyHost != null) {
0939: serverCore.this .denyHost.put(
0940: ("" + this .userAddress
0941: .getHostAddress()),
0942: "deny"); // block client: hacker attempt
0943: }
0944: }
0945: break;
0946: // the client requested a command that does not exist
0947: //Object[] errorParameter = { nsme };
0948: //writeLine((String) error.invoke(this.cmdObject, errorParameter));
0949: } catch (IllegalAccessException iae) {
0950: System.out.println("ERROR C "
0951: + this .userAddress.getHostAddress());
0952: // wrong parameters: this an only be an internal problem
0953: writeLine(this .commandObj.error(iae));
0954: } catch (java.lang.ClassCastException e) {
0955: System.out.println("ERROR D "
0956: + this .userAddress.getHostAddress());
0957: // ??
0958: writeLine(this .commandObj.error(e));
0959: } catch (Exception e) {
0960: System.out.println("ERROR E "
0961: + this .userAddress.getHostAddress());
0962: // whatever happens: the thread has to survive!
0963: writeLine("UNKNOWN REASON:"
0964: + this .commandObj.error(e));
0965: }
0966: } // end of while
0967: } /* catch (java.lang.ClassNotFoundException e) {
0968: System.out.println("Internal error: Wrapper class not found: " + e.getMessage());
0969: System.exit(0);
0970: } */catch (java.io.IOException e) {
0971: // connection interruption: more or less normal
0972: }
0973: //announceMoreExecTime(System.currentTimeMillis() - this.start);
0974: }
0975:
0976: public boolean isSSL() {
0977: return this .controlSocket instanceof SSLSocket;
0978: }
0979:
0980: }
0981:
0982: /**
0983: * Read a line from a protocol stream (HTTP/ICAP) and do some
0984: * pre-processing (check validity, strip line endings).
0985: * <br>
0986: * Illegal control characters will be stripped from the result.
0987: * Besides the valid line ending CRLF a single LF is treated as a
0988: * line ending as well to avoid errors with buggy server.
0989: *
0990: * @param pbis The stream to read from.
0991: * @param maxSize maximum number of bytes to read in one run.
0992: * @param logerr log error messages if true, be silent otherwise.
0993: *
0994: * @return A byte array representing one line of the input or
0995: * <code>null</null> if EOS reached.
0996: */
0997: public static byte[] receive(PushbackInputStream pbis, int maxSize,
0998: boolean logerr) {
0999:
1000: // reuse an existing linebuffer
1001: serverByteBuffer readLineBuffer = new serverByteBuffer(80);
1002:
1003: int bufferSize = 0, b = 0;
1004: try {
1005: // catch bytes until line end or illegal character reached or buffer full
1006: // resulting readLineBuffer doesn't include CRLF or illegal control chars
1007: while (bufferSize < maxSize) {
1008: b = pbis.read();
1009:
1010: if ((b > 31 && b != 127) || b == HT) {
1011: // add legal chars to the result
1012: readLineBuffer.append(b);
1013: bufferSize++;
1014: } else if (b == CR) {
1015: // possible beginning of CRLF, check following byte
1016: b = pbis.read();
1017: if (b == LF) {
1018: // line end catched: break the loop
1019: break;
1020: } else if (b >= 0) {
1021: // no line end: push back the byte, ignore the CR
1022: pbis.unread(b);
1023: }
1024: } else if (b == LF || b < 0) {
1025: // LF without precedent CR: treat as line end of broken servers
1026: // b < 0: EOS
1027: break;
1028: }
1029: }
1030:
1031: // EOS
1032: if (bufferSize == 0 && b == -1)
1033: return null;
1034: return readLineBuffer.getBytes();
1035: } catch (ClosedByInterruptException e) {
1036: if (logerr)
1037: serverLog.logSevere("SERVER",
1038: "receive interrupted - timeout");
1039: return null;
1040: } catch (IOException e) {
1041: if (logerr)
1042: serverLog.logSevere("SERVER",
1043: "receive interrupted - exception 2 = "
1044: + e.getMessage());
1045: return null;
1046: }
1047: }
1048:
1049: public static void send(OutputStream os, String buf)
1050: throws IOException {
1051: os.write(buf.getBytes());
1052: // TODO make sure there was no reason to add this additional newline
1053: //os.write(CRLF);
1054: os.flush();
1055: }
1056:
1057: public static void send(OutputStream os, byte[] buf)
1058: throws IOException {
1059: os.write(buf);
1060: os.write(CRLF);
1061: os.flush();
1062: }
1063:
1064: public static String send(OutputStream os, InputStream is)
1065: throws IOException {
1066: int bufferSize = is.available();
1067: byte[] buffer = new byte[((bufferSize < 1) || (bufferSize > 4096)) ? 4096
1068: : bufferSize];
1069: int l;
1070: while ((l = is.read(buffer)) > 0) {
1071: os.write(buffer, 0, l);
1072: }
1073: os.write(CRLF);
1074: os.flush();
1075: if (bufferSize > 80)
1076: return "<LONG STREAM>";
1077: else
1078: return new String(buffer);
1079: }
1080:
1081: protected void finalize() throws Throwable {
1082: super .finalize();
1083: }
1084:
1085: public static final void checkInterruption()
1086: throws InterruptedException {
1087: Thread currentThread = Thread.currentThread();
1088: if (currentThread.isInterrupted())
1089: throw new InterruptedException();
1090: if ((currentThread instanceof serverCore.Session)
1091: && ((serverCore.Session) currentThread).isStopped())
1092: throw new InterruptedException();
1093: }
1094:
1095: public void reconnect() {
1096: this .reconnect(5000);
1097: }
1098:
1099: public void reconnect(int delay) {
1100: Thread restart = new Restarter();
1101: restart.start();
1102: }
1103:
1104: // restarting the serverCore
1105: public class Restarter extends Thread {
1106: public serverCore theServerCore = null;
1107: public int delay = 5000;
1108:
1109: public void run() {
1110: // waiting for a while
1111: try {
1112: Thread.sleep(delay);
1113: } catch (InterruptedException e) {
1114: // TODO Auto-generated catch block
1115: e.printStackTrace();
1116: }
1117:
1118: // signaling restart
1119: serverCore.this .forceRestart = true;
1120:
1121: // closing socket to notify the thread
1122: serverCore.this .close();
1123: }
1124: }
1125:
1126: private SSLSocketFactory initSSLFactory() {
1127:
1128: // getting the keystore file name
1129: String keyStoreFileName = this .switchboard.getConfig(
1130: "keyStore", "").trim();
1131:
1132: // getting the keystore pwd
1133: String keyStorePwd = this .switchboard.getConfig(
1134: "keyStorePassword", "").trim();
1135:
1136: // take a look if we have something to import
1137: String pkcs12ImportFile = this .switchboard.getConfig(
1138: "pkcs12ImportFile", "").trim();
1139: if (pkcs12ImportFile.length() > 0) {
1140: this .log.logInfo("Import certificates from import file '"
1141: + pkcs12ImportFile + "'.");
1142:
1143: try {
1144: // getting the password
1145: String pkcs12ImportPwd = this .switchboard.getConfig(
1146: "pkcs12ImportPwd", "").trim();
1147:
1148: // creating tool to import cert
1149: PKCS12Tool pkcsTool = new PKCS12Tool(pkcs12ImportFile,
1150: pkcs12ImportPwd);
1151:
1152: // creating a new keystore file
1153: if (keyStoreFileName.length() == 0) {
1154: // using the default keystore name
1155: keyStoreFileName = "DATA/SETTINGS/myPeerKeystore";
1156:
1157: // creating an empty java keystore
1158: KeyStore ks = KeyStore.getInstance("JKS");
1159: ks.load(null, keyStorePwd.toCharArray());
1160: FileOutputStream ksOut = new FileOutputStream(
1161: keyStoreFileName);
1162: ks.store(ksOut, keyStorePwd.toCharArray());
1163: ksOut.close();
1164:
1165: // storing path to keystore into config file
1166: this .switchboard.setConfig("keyStore",
1167: keyStoreFileName);
1168: }
1169:
1170: // importing certificate
1171: pkcsTool.importToJKS(keyStoreFileName, keyStorePwd);
1172:
1173: // removing entries from config file
1174: this .switchboard.setConfig("pkcs12ImportFile", "");
1175: this .switchboard.setConfig("keyStorePassword", "");
1176:
1177: // deleting original import file
1178: // TODO: should we do this
1179:
1180: } catch (Exception e) {
1181: this .log.logSevere(
1182: "Unable to import certificate from import file '"
1183: + pkcs12ImportFile + "'.", e);
1184: }
1185: } else if (keyStoreFileName.length() == 0)
1186: return null;
1187:
1188: // get the ssl context
1189: try {
1190: this .log.logInfo("Initializing SSL support ...");
1191:
1192: // creating a new keystore instance of type (java key store)
1193: this .log.logFine("Initializing keystore ...");
1194: KeyStore ks = KeyStore.getInstance("JKS");
1195:
1196: // loading keystore data from file
1197: this .log.logFine("Loading keystore file "
1198: + keyStoreFileName);
1199: FileInputStream stream = new FileInputStream(
1200: keyStoreFileName);
1201: ks.load(stream, keyStorePwd.toCharArray());
1202:
1203: // creating a keystore factory
1204: this .log.logFine("Initializing key manager factory ...");
1205: KeyManagerFactory kmf = KeyManagerFactory
1206: .getInstance(KeyManagerFactory
1207: .getDefaultAlgorithm());
1208: kmf.init(ks, keyStorePwd.toCharArray());
1209:
1210: // initializing the ssl context
1211: this .log.logFine("Initializing SSL context ...");
1212: SSLContext sslcontext = SSLContext.getInstance("TLS");
1213: sslcontext.init(kmf.getKeyManagers(), null, null);
1214:
1215: SSLSocketFactory factory = sslcontext.getSocketFactory();
1216: this .log.logInfo("SSL support initialized successfully");
1217: return factory;
1218: } catch (Exception e) {
1219: String errorMsg = "FATAL ERROR: Unable to initialize the SSL Socket factory. "
1220: + e.getMessage();
1221: this .log.logSevere(errorMsg);
1222: System.out.println(errorMsg);
1223: System.exit(0);
1224: return null;
1225: }
1226: }
1227:
1228: public Socket negotiateSSL(Socket sock) throws Exception {
1229:
1230: SSLSocket sslsock;
1231:
1232: try {
1233: sslsock = (SSLSocket) this .sslSocketFactory.createSocket(
1234: sock, sock.getInetAddress().getHostName(), sock
1235: .getPort(), true);
1236:
1237: sslsock
1238: .addHandshakeCompletedListener(new HandshakeCompletedListener() {
1239: public void handshakeCompleted(
1240: HandshakeCompletedEvent event) {
1241: System.out.println("Handshake finished!");
1242: System.out.println("\t CipherSuite:"
1243: + event.getCipherSuite());
1244: System.out.println("\t SessionId "
1245: + event.getSession());
1246: System.out.println("\t PeerHost "
1247: + event.getSession().getPeerHost());
1248: }
1249: });
1250:
1251: sslsock.setUseClientMode(false);
1252: String[] suites = sslsock.getSupportedCipherSuites();
1253: sslsock.setEnabledCipherSuites(suites);
1254: // start handshake
1255: sslsock.startHandshake();
1256:
1257: //String cipherSuite = sslsock.getSession().getCipherSuite();
1258:
1259: return sslsock;
1260: } catch (Exception e) {
1261: throw e;
1262: }
1263: }
1264: }
|