0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017:
0018: package org.apache.tomcat.util.net;
0019:
0020: import java.net.InetAddress;
0021: import java.util.ArrayList;
0022: import java.util.HashMap;
0023: import java.util.concurrent.Executor;
0024:
0025: import org.apache.juli.logging.Log;
0026: import org.apache.juli.logging.LogFactory;
0027: import org.apache.tomcat.jni.Address;
0028: import org.apache.tomcat.jni.Error;
0029: import org.apache.tomcat.jni.File;
0030: import org.apache.tomcat.jni.Library;
0031: import org.apache.tomcat.jni.OS;
0032: import org.apache.tomcat.jni.Poll;
0033: import org.apache.tomcat.jni.Pool;
0034: import org.apache.tomcat.jni.SSL;
0035: import org.apache.tomcat.jni.SSLContext;
0036: import org.apache.tomcat.jni.SSLSocket;
0037: import org.apache.tomcat.jni.Socket;
0038: import org.apache.tomcat.jni.Status;
0039: import org.apache.tomcat.util.res.StringManager;
0040:
0041: /**
0042: * APR tailored thread pool, providing the following services:
0043: * <ul>
0044: * <li>Socket acceptor thread</li>
0045: * <li>Socket poller thread</li>
0046: * <li>Sendfile thread</li>
0047: * <li>Worker threads pool</li>
0048: * </ul>
0049: *
0050: * When switching to Java 5, there's an opportunity to use the virtual
0051: * machine's thread pool.
0052: *
0053: * @author Mladen Turk
0054: * @author Remy Maucherat
0055: */
0056: public class AprEndpoint {
0057:
0058: // -------------------------------------------------------------- Constants
0059:
0060: protected static Log log = LogFactory.getLog(AprEndpoint.class);
0061:
0062: protected static StringManager sm = StringManager
0063: .getManager("org.apache.tomcat.util.net.res");
0064:
0065: /**
0066: * The Request attribute key for the cipher suite.
0067: */
0068: public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
0069:
0070: /**
0071: * The Request attribute key for the key size.
0072: */
0073: public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
0074:
0075: /**
0076: * The Request attribute key for the client certificate chain.
0077: */
0078: public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
0079:
0080: /**
0081: * The Request attribute key for the session id.
0082: * This one is a Tomcat extension to the Servlet spec.
0083: */
0084: public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
0085:
0086: // ----------------------------------------------------------------- Fields
0087:
0088: /**
0089: * Available workers.
0090: */
0091: protected WorkerStack workers = null;
0092:
0093: /**
0094: * Running state of the endpoint.
0095: */
0096: protected volatile boolean running = false;
0097:
0098: /**
0099: * Will be set to true whenever the endpoint is paused.
0100: */
0101: protected volatile boolean paused = false;
0102:
0103: /**
0104: * Track the initialization state of the endpoint.
0105: */
0106: protected boolean initialized = false;
0107:
0108: /**
0109: * Current worker threads busy count.
0110: */
0111: protected int curThreadsBusy = 0;
0112:
0113: /**
0114: * Current worker threads count.
0115: */
0116: protected int curThreads = 0;
0117:
0118: /**
0119: * Sequence number used to generate thread names.
0120: */
0121: protected int sequence = 0;
0122:
0123: /**
0124: * Root APR memory pool.
0125: */
0126: protected long rootPool = 0;
0127:
0128: /**
0129: * Server socket "pointer".
0130: */
0131: protected long serverSock = 0;
0132:
0133: /**
0134: * APR memory pool for the server socket.
0135: */
0136: protected long serverSockPool = 0;
0137:
0138: /**
0139: * SSL context.
0140: */
0141: protected long sslContext = 0;
0142:
0143: /**
0144: * Defer accept.
0145: */
0146: protected boolean deferAccept = true;
0147:
0148: // ------------------------------------------------------------- Properties
0149:
0150: /**
0151: * External Executor based thread pool.
0152: */
0153: protected Executor executor = null;
0154:
0155: public void setExecutor(Executor executor) {
0156: this .executor = executor;
0157: }
0158:
0159: public Executor getExecutor() {
0160: return executor;
0161: }
0162:
0163: /**
0164: * Maximum amount of worker threads.
0165: */
0166: protected int maxThreads = 40;
0167:
0168: public void setMaxThreads(int maxThreads) {
0169: this .maxThreads = maxThreads;
0170: }
0171:
0172: public int getMaxThreads() {
0173: return maxThreads;
0174: }
0175:
0176: /**
0177: * Priority of the acceptor and poller threads.
0178: */
0179: protected int threadPriority = Thread.NORM_PRIORITY;
0180:
0181: public void setThreadPriority(int threadPriority) {
0182: this .threadPriority = threadPriority;
0183: }
0184:
0185: public int getThreadPriority() {
0186: return threadPriority;
0187: }
0188:
0189: /**
0190: * Size of the socket poller.
0191: */
0192: protected int pollerSize = 8 * 1024;
0193:
0194: public void setPollerSize(int pollerSize) {
0195: this .pollerSize = pollerSize;
0196: }
0197:
0198: public int getPollerSize() {
0199: return pollerSize;
0200: }
0201:
0202: /**
0203: * Size of the sendfile (= concurrent files which can be served).
0204: */
0205: protected int sendfileSize = 1 * 1024;
0206:
0207: public void setSendfileSize(int sendfileSize) {
0208: this .sendfileSize = sendfileSize;
0209: }
0210:
0211: public int getSendfileSize() {
0212: return sendfileSize;
0213: }
0214:
0215: /**
0216: * Server socket port.
0217: */
0218: protected int port;
0219:
0220: public int getPort() {
0221: return port;
0222: }
0223:
0224: public void setPort(int port) {
0225: this .port = port;
0226: }
0227:
0228: /**
0229: * Address for the server socket.
0230: */
0231: protected InetAddress address;
0232:
0233: public InetAddress getAddress() {
0234: return address;
0235: }
0236:
0237: public void setAddress(InetAddress address) {
0238: this .address = address;
0239: }
0240:
0241: /**
0242: * Handling of accepted sockets.
0243: */
0244: protected Handler handler = null;
0245:
0246: public void setHandler(Handler handler) {
0247: this .handler = handler;
0248: }
0249:
0250: public Handler getHandler() {
0251: return handler;
0252: }
0253:
0254: /**
0255: * Allows the server developer to specify the backlog that
0256: * should be used for server sockets. By default, this value
0257: * is 100.
0258: */
0259: protected int backlog = 100;
0260:
0261: public void setBacklog(int backlog) {
0262: if (backlog > 0)
0263: this .backlog = backlog;
0264: }
0265:
0266: public int getBacklog() {
0267: return backlog;
0268: }
0269:
0270: /**
0271: * Socket TCP no delay.
0272: */
0273: protected boolean tcpNoDelay = false;
0274:
0275: public boolean getTcpNoDelay() {
0276: return tcpNoDelay;
0277: }
0278:
0279: public void setTcpNoDelay(boolean tcpNoDelay) {
0280: this .tcpNoDelay = tcpNoDelay;
0281: }
0282:
0283: /**
0284: * Socket linger.
0285: */
0286: protected int soLinger = 100;
0287:
0288: public int getSoLinger() {
0289: return soLinger;
0290: }
0291:
0292: public void setSoLinger(int soLinger) {
0293: this .soLinger = soLinger;
0294: }
0295:
0296: /**
0297: * Socket timeout.
0298: */
0299: protected int soTimeout = -1;
0300:
0301: public int getSoTimeout() {
0302: return soTimeout;
0303: }
0304:
0305: public void setSoTimeout(int soTimeout) {
0306: this .soTimeout = soTimeout;
0307: }
0308:
0309: /**
0310: * Keep-Alive timeout.
0311: */
0312: protected int keepAliveTimeout = -1;
0313:
0314: public int getKeepAliveTimeout() {
0315: return keepAliveTimeout;
0316: }
0317:
0318: public void setKeepAliveTimeout(int keepAliveTimeout) {
0319: this .keepAliveTimeout = keepAliveTimeout;
0320: }
0321:
0322: /**
0323: * Poll interval, in microseconds. The smaller the value, the more CPU the poller
0324: * will use, but the more responsive to activity it will be.
0325: */
0326: protected int pollTime = 2000;
0327:
0328: public int getPollTime() {
0329: return pollTime;
0330: }
0331:
0332: public void setPollTime(int pollTime) {
0333: if (pollTime > 0) {
0334: this .pollTime = pollTime;
0335: }
0336: }
0337:
0338: /**
0339: * The default is true - the created threads will be
0340: * in daemon mode. If set to false, the control thread
0341: * will not be daemon - and will keep the process alive.
0342: */
0343: protected boolean daemon = true;
0344:
0345: public void setDaemon(boolean b) {
0346: daemon = b;
0347: }
0348:
0349: public boolean getDaemon() {
0350: return daemon;
0351: }
0352:
0353: /**
0354: * Name of the thread pool, which will be used for naming child threads.
0355: */
0356: protected String name = "TP";
0357:
0358: public void setName(String name) {
0359: this .name = name;
0360: }
0361:
0362: public String getName() {
0363: return name;
0364: }
0365:
0366: /**
0367: * Use endfile for sending static files.
0368: */
0369: protected boolean useSendfile = Library.APR_HAS_SENDFILE;
0370:
0371: public void setUseSendfile(boolean useSendfile) {
0372: this .useSendfile = useSendfile;
0373: }
0374:
0375: public boolean getUseSendfile() {
0376: return useSendfile;
0377: }
0378:
0379: /**
0380: * Allow comet request handling.
0381: */
0382: protected boolean useComet = true;
0383:
0384: public void setUseComet(boolean useComet) {
0385: this .useComet = useComet;
0386: }
0387:
0388: public boolean getUseComet() {
0389: return useComet;
0390: }
0391:
0392: /**
0393: * Acceptor thread count.
0394: */
0395: protected int acceptorThreadCount = 0;
0396:
0397: public void setAcceptorThreadCount(int acceptorThreadCount) {
0398: this .acceptorThreadCount = acceptorThreadCount;
0399: }
0400:
0401: public int getAcceptorThreadCount() {
0402: return acceptorThreadCount;
0403: }
0404:
0405: /**
0406: * Sendfile thread count.
0407: */
0408: protected int sendfileThreadCount = 0;
0409:
0410: public void setSendfileThreadCount(int sendfileThreadCount) {
0411: this .sendfileThreadCount = sendfileThreadCount;
0412: }
0413:
0414: public int getSendfileThreadCount() {
0415: return sendfileThreadCount;
0416: }
0417:
0418: /**
0419: * Poller thread count.
0420: */
0421: protected int pollerThreadCount = 0;
0422:
0423: public void setPollerThreadCount(int pollerThreadCount) {
0424: this .pollerThreadCount = pollerThreadCount;
0425: }
0426:
0427: public int getPollerThreadCount() {
0428: return pollerThreadCount;
0429: }
0430:
0431: /**
0432: * The socket poller.
0433: */
0434: protected Poller[] pollers = null;
0435: protected int pollerRoundRobin = 0;
0436:
0437: public Poller getPoller() {
0438: pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
0439: return pollers[pollerRoundRobin];
0440: }
0441:
0442: /**
0443: * The socket poller used for Comet support.
0444: */
0445: protected Poller[] cometPollers = null;
0446: protected int cometPollerRoundRobin = 0;
0447:
0448: public Poller getCometPoller() {
0449: cometPollerRoundRobin = (cometPollerRoundRobin + 1)
0450: % cometPollers.length;
0451: return cometPollers[cometPollerRoundRobin];
0452: }
0453:
0454: /**
0455: * The static file sender.
0456: */
0457: protected Sendfile[] sendfiles = null;
0458: protected int sendfileRoundRobin = 0;
0459:
0460: public Sendfile getSendfile() {
0461: sendfileRoundRobin = (sendfileRoundRobin + 1)
0462: % sendfiles.length;
0463: return sendfiles[sendfileRoundRobin];
0464: }
0465:
0466: /**
0467: * Dummy maxSpareThreads property.
0468: */
0469: public int getMaxSpareThreads() {
0470: return 0;
0471: }
0472:
0473: /**
0474: * Dummy minSpareThreads property.
0475: */
0476: public int getMinSpareThreads() {
0477: return 0;
0478: }
0479:
0480: /**
0481: * SSL engine.
0482: */
0483: protected boolean SSLEnabled = false;
0484:
0485: public boolean isSSLEnabled() {
0486: return SSLEnabled;
0487: }
0488:
0489: public void setSSLEnabled(boolean SSLEnabled) {
0490: this .SSLEnabled = SSLEnabled;
0491: }
0492:
0493: /**
0494: * SSL protocols.
0495: */
0496: protected String SSLProtocol = "all";
0497:
0498: public String getSSLProtocol() {
0499: return SSLProtocol;
0500: }
0501:
0502: public void setSSLProtocol(String SSLProtocol) {
0503: this .SSLProtocol = SSLProtocol;
0504: }
0505:
0506: /**
0507: * SSL password (if a cert is encrypted, and no password has been provided, a callback
0508: * will ask for a password).
0509: */
0510: protected String SSLPassword = null;
0511:
0512: public String getSSLPassword() {
0513: return SSLPassword;
0514: }
0515:
0516: public void setSSLPassword(String SSLPassword) {
0517: this .SSLPassword = SSLPassword;
0518: }
0519:
0520: /**
0521: * SSL cipher suite.
0522: */
0523: protected String SSLCipherSuite = "ALL";
0524:
0525: public String getSSLCipherSuite() {
0526: return SSLCipherSuite;
0527: }
0528:
0529: public void setSSLCipherSuite(String SSLCipherSuite) {
0530: this .SSLCipherSuite = SSLCipherSuite;
0531: }
0532:
0533: /**
0534: * SSL certificate file.
0535: */
0536: protected String SSLCertificateFile = null;
0537:
0538: public String getSSLCertificateFile() {
0539: return SSLCertificateFile;
0540: }
0541:
0542: public void setSSLCertificateFile(String SSLCertificateFile) {
0543: this .SSLCertificateFile = SSLCertificateFile;
0544: }
0545:
0546: /**
0547: * SSL certificate key file.
0548: */
0549: protected String SSLCertificateKeyFile = null;
0550:
0551: public String getSSLCertificateKeyFile() {
0552: return SSLCertificateKeyFile;
0553: }
0554:
0555: public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) {
0556: this .SSLCertificateKeyFile = SSLCertificateKeyFile;
0557: }
0558:
0559: /**
0560: * SSL certificate chain file.
0561: */
0562: protected String SSLCertificateChainFile = null;
0563:
0564: public String getSSLCertificateChainFile() {
0565: return SSLCertificateChainFile;
0566: }
0567:
0568: public void setSSLCertificateChainFile(
0569: String SSLCertificateChainFile) {
0570: this .SSLCertificateChainFile = SSLCertificateChainFile;
0571: }
0572:
0573: /**
0574: * SSL CA certificate path.
0575: */
0576: protected String SSLCACertificatePath = null;
0577:
0578: public String getSSLCACertificatePath() {
0579: return SSLCACertificatePath;
0580: }
0581:
0582: public void setSSLCACertificatePath(String SSLCACertificatePath) {
0583: this .SSLCACertificatePath = SSLCACertificatePath;
0584: }
0585:
0586: /**
0587: * SSL CA certificate file.
0588: */
0589: protected String SSLCACertificateFile = null;
0590:
0591: public String getSSLCACertificateFile() {
0592: return SSLCACertificateFile;
0593: }
0594:
0595: public void setSSLCACertificateFile(String SSLCACertificateFile) {
0596: this .SSLCACertificateFile = SSLCACertificateFile;
0597: }
0598:
0599: /**
0600: * SSL CA revocation path.
0601: */
0602: protected String SSLCARevocationPath = null;
0603:
0604: public String getSSLCARevocationPath() {
0605: return SSLCARevocationPath;
0606: }
0607:
0608: public void setSSLCARevocationPath(String SSLCARevocationPath) {
0609: this .SSLCARevocationPath = SSLCARevocationPath;
0610: }
0611:
0612: /**
0613: * SSL CA revocation file.
0614: */
0615: protected String SSLCARevocationFile = null;
0616:
0617: public String getSSLCARevocationFile() {
0618: return SSLCARevocationFile;
0619: }
0620:
0621: public void setSSLCARevocationFile(String SSLCARevocationFile) {
0622: this .SSLCARevocationFile = SSLCARevocationFile;
0623: }
0624:
0625: /**
0626: * SSL verify client.
0627: */
0628: protected String SSLVerifyClient = "none";
0629:
0630: public String getSSLVerifyClient() {
0631: return SSLVerifyClient;
0632: }
0633:
0634: public void setSSLVerifyClient(String SSLVerifyClient) {
0635: this .SSLVerifyClient = SSLVerifyClient;
0636: }
0637:
0638: /**
0639: * SSL verify depth.
0640: */
0641: protected int SSLVerifyDepth = 10;
0642:
0643: public int getSSLVerifyDepth() {
0644: return SSLVerifyDepth;
0645: }
0646:
0647: public void setSSLVerifyDepth(int SSLVerifyDepth) {
0648: this .SSLVerifyDepth = SSLVerifyDepth;
0649: }
0650:
0651: // --------------------------------------------------------- Public Methods
0652:
0653: /**
0654: * Number of keepalive sockets.
0655: */
0656: public int getKeepAliveCount() {
0657: if (pollers == null) {
0658: return 0;
0659: } else {
0660: int keepAliveCount = 0;
0661: for (int i = 0; i < pollers.length; i++) {
0662: keepAliveCount += pollers[i].getKeepAliveCount();
0663: }
0664: return keepAliveCount;
0665: }
0666: }
0667:
0668: /**
0669: * Number of sendfile sockets.
0670: */
0671: public int getSendfileCount() {
0672: if (sendfiles == null) {
0673: return 0;
0674: } else {
0675: int sendfileCount = 0;
0676: for (int i = 0; i < sendfiles.length; i++) {
0677: sendfileCount += sendfiles[i].getSendfileCount();
0678: }
0679: return sendfileCount;
0680: }
0681: }
0682:
0683: /**
0684: * Return the amount of threads that are managed by the pool.
0685: *
0686: * @return the amount of threads that are managed by the pool
0687: */
0688: public int getCurrentThreadCount() {
0689: return curThreads;
0690: }
0691:
0692: /**
0693: * Return the amount of threads currently busy.
0694: *
0695: * @return the amount of threads currently busy
0696: */
0697: public int getCurrentThreadsBusy() {
0698: return curThreadsBusy;
0699: }
0700:
0701: /**
0702: * Return the state of the endpoint.
0703: *
0704: * @return true if the endpoint is running, false otherwise
0705: */
0706: public boolean isRunning() {
0707: return running;
0708: }
0709:
0710: /**
0711: * Return the state of the endpoint.
0712: *
0713: * @return true if the endpoint is paused, false otherwise
0714: */
0715: public boolean isPaused() {
0716: return paused;
0717: }
0718:
0719: // ----------------------------------------------- Public Lifecycle Methods
0720:
0721: /**
0722: * Initialize the endpoint.
0723: */
0724: public void init() throws Exception {
0725:
0726: if (initialized)
0727: return;
0728:
0729: // Create the root APR memory pool
0730: rootPool = Pool.create(0);
0731: // Create the pool for the server socket
0732: serverSockPool = Pool.create(rootPool);
0733: // Create the APR address that will be bound
0734: String addressStr = null;
0735: if (address == null) {
0736: addressStr = null;
0737: } else {
0738: addressStr = address.getHostAddress();
0739: }
0740: int family = Socket.APR_INET;
0741: if (Library.APR_HAVE_IPV6) {
0742: if (addressStr == null) {
0743: if (!OS.IS_BSD && !OS.IS_WIN32 && !OS.IS_WIN64)
0744: family = Socket.APR_UNSPEC;
0745: } else if (addressStr.indexOf(':') >= 0) {
0746: family = Socket.APR_UNSPEC;
0747: }
0748: }
0749:
0750: long inetAddress = Address.info(addressStr, family, port, 0,
0751: rootPool);
0752: // Create the APR server socket
0753: serverSock = Socket.create(family, Socket.SOCK_STREAM,
0754: Socket.APR_PROTO_TCP, rootPool);
0755: if (OS.IS_UNIX) {
0756: Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
0757: }
0758: // Deal with the firewalls that tend to drop the inactive sockets
0759: Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
0760: // Bind the server socket
0761: int ret = Socket.bind(serverSock, inetAddress);
0762: if (ret != 0) {
0763: throw new Exception(sm.getString("endpoint.init.bind", ""
0764: + ret, Error.strerror(ret)));
0765: }
0766: // Start listening on the server socket
0767: ret = Socket.listen(serverSock, backlog);
0768: if (ret != 0) {
0769: throw new Exception(sm.getString("endpoint.init.listen", ""
0770: + ret, Error.strerror(ret)));
0771: }
0772: if (OS.IS_WIN32 || OS.IS_WIN64) {
0773: // On Windows set the reuseaddr flag after the bind/listen
0774: Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
0775: }
0776:
0777: // Sendfile usage on systems which don't support it cause major problems
0778: if (useSendfile && !Library.APR_HAS_SENDFILE) {
0779: useSendfile = false;
0780: }
0781:
0782: // Initialize thread count defaults for acceptor, poller and sendfile
0783: if (acceptorThreadCount == 0) {
0784: // FIXME: Doesn't seem to work that well with multiple accept threads
0785: acceptorThreadCount = 1;
0786: }
0787: if (pollerThreadCount == 0) {
0788: if ((OS.IS_WIN32 || OS.IS_WIN64) && (pollerSize > 1024)) {
0789: // The maximum per poller to get reasonable performance is 1024
0790: pollerThreadCount = pollerSize / 1024;
0791: // Adjust poller size so that it won't reach the limit
0792: pollerSize = pollerSize - (pollerSize % 1024);
0793: } else {
0794: // No explicit poller size limitation
0795: pollerThreadCount = 1;
0796: }
0797: }
0798: if (sendfileThreadCount == 0) {
0799: if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) {
0800: // The maximum per poller to get reasonable performance is 1024
0801: sendfileThreadCount = sendfileSize / 1024;
0802: // Adjust poller size so that it won't reach the limit
0803: sendfileSize = sendfileSize - (sendfileSize % 1024);
0804: } else {
0805: // No explicit poller size limitation
0806: // FIXME: Default to one per CPU ?
0807: sendfileThreadCount = 1;
0808: }
0809: }
0810:
0811: // Delay accepting of new connections until data is available
0812: // Only Linux kernels 2.4 + have that implemented
0813: // on other platforms this call is noop and will return APR_ENOTIMPL.
0814: if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) {
0815: deferAccept = false;
0816: }
0817:
0818: // Initialize SSL if needed
0819: if (SSLEnabled) {
0820:
0821: // SSL protocol
0822: int value = SSL.SSL_PROTOCOL_ALL;
0823: if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {
0824: value = SSL.SSL_PROTOCOL_SSLV2;
0825: } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {
0826: value = SSL.SSL_PROTOCOL_SSLV3;
0827: } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {
0828: value = SSL.SSL_PROTOCOL_TLSV1;
0829: } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {
0830: value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;
0831: }
0832: // Create SSL Context
0833: sslContext = SSLContext.make(rootPool, value,
0834: SSL.SSL_MODE_SERVER);
0835: // List the ciphers that the client is permitted to negotiate
0836: SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
0837: // Load Server key and certificate
0838: SSLContext.setCertificate(sslContext, SSLCertificateFile,
0839: SSLCertificateKeyFile, SSLPassword,
0840: SSL.SSL_AIDX_RSA);
0841: // Set certificate chain file
0842: SSLContext.setCertificateChainFile(sslContext,
0843: SSLCertificateChainFile, false);
0844: // Support Client Certificates
0845: SSLContext.setCACertificate(sslContext,
0846: SSLCACertificateFile, SSLCACertificatePath);
0847: // Set revocation
0848: SSLContext.setCARevocation(sslContext, SSLCARevocationFile,
0849: SSLCARevocationPath);
0850: // Client certificate verification
0851: value = SSL.SSL_CVERIFY_NONE;
0852: if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
0853: value = SSL.SSL_CVERIFY_OPTIONAL;
0854: } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
0855: value = SSL.SSL_CVERIFY_REQUIRE;
0856: } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
0857: value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
0858: }
0859: SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
0860: // For now, sendfile is not supported with SSL
0861: useSendfile = false;
0862: }
0863:
0864: initialized = true;
0865:
0866: }
0867:
0868: /**
0869: * Start the APR endpoint, creating acceptor, poller and sendfile threads.
0870: */
0871: public void start() throws Exception {
0872: // Initialize socket if not done before
0873: if (!initialized) {
0874: init();
0875: }
0876: if (!running) {
0877: running = true;
0878: paused = false;
0879:
0880: // Create worker collection
0881: if (executor == null) {
0882: workers = new WorkerStack(maxThreads);
0883: }
0884:
0885: // Start acceptor threads
0886: for (int i = 0; i < acceptorThreadCount; i++) {
0887: Thread acceptorThread = new Thread(new Acceptor(),
0888: getName() + "-Acceptor-" + i);
0889: acceptorThread.setPriority(threadPriority);
0890: acceptorThread.setDaemon(daemon);
0891: acceptorThread.start();
0892: }
0893:
0894: // Start poller threads
0895: pollers = new Poller[pollerThreadCount];
0896: for (int i = 0; i < pollerThreadCount; i++) {
0897: pollers[i] = new Poller(false);
0898: pollers[i].init();
0899: Thread pollerThread = new Thread(pollers[i], getName()
0900: + "-Poller-" + i);
0901: pollerThread.setPriority(threadPriority);
0902: pollerThread.setDaemon(true);
0903: pollerThread.start();
0904: }
0905:
0906: // Start comet poller threads
0907: cometPollers = new Poller[pollerThreadCount];
0908: for (int i = 0; i < pollerThreadCount; i++) {
0909: cometPollers[i] = new Poller(true);
0910: cometPollers[i].init();
0911: Thread pollerThread = new Thread(cometPollers[i],
0912: getName() + "-CometPoller-" + i);
0913: pollerThread.setPriority(threadPriority);
0914: pollerThread.setDaemon(true);
0915: pollerThread.start();
0916: }
0917:
0918: // Start sendfile threads
0919: if (useSendfile) {
0920: sendfiles = new Sendfile[sendfileThreadCount];
0921: for (int i = 0; i < sendfileThreadCount; i++) {
0922: sendfiles[i] = new Sendfile();
0923: sendfiles[i].init();
0924: Thread sendfileThread = new Thread(sendfiles[i],
0925: getName() + "-Sendfile-" + i);
0926: sendfileThread.setPriority(threadPriority);
0927: sendfileThread.setDaemon(true);
0928: sendfileThread.start();
0929: }
0930: }
0931: }
0932: }
0933:
0934: /**
0935: * Pause the endpoint, which will make it stop accepting new sockets.
0936: */
0937: public void pause() {
0938: if (running && !paused) {
0939: paused = true;
0940: unlockAccept();
0941: }
0942: }
0943:
0944: /**
0945: * Resume the endpoint, which will make it start accepting new sockets
0946: * again.
0947: */
0948: public void resume() {
0949: if (running) {
0950: paused = false;
0951: }
0952: }
0953:
0954: /**
0955: * Stop the endpoint. This will cause all processing threads to stop.
0956: */
0957: public void stop() {
0958: if (running) {
0959: running = false;
0960: unlockAccept();
0961: for (int i = 0; i < pollers.length; i++) {
0962: pollers[i].destroy();
0963: }
0964: pollers = null;
0965: for (int i = 0; i < cometPollers.length; i++) {
0966: cometPollers[i].destroy();
0967: }
0968: cometPollers = null;
0969: if (useSendfile) {
0970: for (int i = 0; i < sendfiles.length; i++) {
0971: sendfiles[i].destroy();
0972: }
0973: sendfiles = null;
0974: }
0975: }
0976: }
0977:
0978: /**
0979: * Deallocate APR memory pools, and close server socket.
0980: */
0981: public void destroy() throws Exception {
0982: if (running) {
0983: stop();
0984: }
0985: Pool.destroy(serverSockPool);
0986: serverSockPool = 0;
0987: // Close server socket
0988: Socket.close(serverSock);
0989: serverSock = 0;
0990: sslContext = 0;
0991: // Close all APR memory pools and resources
0992: Pool.destroy(rootPool);
0993: rootPool = 0;
0994: initialized = false;
0995: }
0996:
0997: // ------------------------------------------------------ Protected Methods
0998:
0999: /**
1000: * Get a sequence number used for thread naming.
1001: */
1002: protected int getSequence() {
1003: return sequence++;
1004: }
1005:
1006: /**
1007: * Unlock the server socket accept using a bugus connection.
1008: */
1009: protected void unlockAccept() {
1010: java.net.Socket s = null;
1011: try {
1012: // Need to create a connection to unlock the accept();
1013: if (address == null) {
1014: s = new java.net.Socket("127.0.0.1", port);
1015: } else {
1016: s = new java.net.Socket(address, port);
1017: // setting soLinger to a small value will help shutdown the
1018: // connection quicker
1019: s.setSoLinger(true, 0);
1020: }
1021: } catch (Exception e) {
1022: if (log.isDebugEnabled()) {
1023: log.debug(sm.getString("endpoint.debug.unlock", ""
1024: + port), e);
1025: }
1026: } finally {
1027: if (s != null) {
1028: try {
1029: s.close();
1030: } catch (Exception e) {
1031: // Ignore
1032: }
1033: }
1034: }
1035: }
1036:
1037: /**
1038: * Process the specified connection.
1039: */
1040: protected boolean setSocketOptions(long socket) {
1041: // Process the connection
1042: int step = 1;
1043: try {
1044:
1045: // 1: Set socket options: timeout, linger, etc
1046: if (soLinger >= 0)
1047: Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger);
1048: if (tcpNoDelay)
1049: Socket.optSet(socket, Socket.APR_TCP_NODELAY,
1050: (tcpNoDelay ? 1 : 0));
1051: if (soTimeout > 0)
1052: Socket.timeoutSet(socket, soTimeout * 1000);
1053:
1054: // 2: SSL handshake
1055: step = 2;
1056: if (sslContext != 0) {
1057: SSLSocket.attach(sslContext, socket);
1058: if (SSLSocket.handshake(socket) != 0) {
1059: if (log.isDebugEnabled()) {
1060: log.debug(sm
1061: .getString("endpoint.err.handshake")
1062: + ": " + SSL.getLastError());
1063: }
1064: return false;
1065: }
1066: }
1067:
1068: } catch (Throwable t) {
1069: if (log.isDebugEnabled()) {
1070: if (step == 2) {
1071: log
1072: .debug(
1073: sm
1074: .getString("endpoint.err.handshake"),
1075: t);
1076: } else {
1077: log.debug(sm.getString("endpoint.err.unexpected"),
1078: t);
1079: }
1080: }
1081: // Tell to close the socket
1082: return false;
1083: }
1084: return true;
1085: }
1086:
1087: /**
1088: * Create (or allocate) and return an available processor for use in
1089: * processing a specific HTTP request, if possible. If the maximum
1090: * allowed processors have already been created and are in use, return
1091: * <code>null</code> instead.
1092: */
1093: protected Worker createWorkerThread() {
1094:
1095: synchronized (workers) {
1096: if (workers.size() > 0) {
1097: curThreadsBusy++;
1098: return (workers.pop());
1099: }
1100: if ((maxThreads > 0) && (curThreads < maxThreads)) {
1101: curThreadsBusy++;
1102: return (newWorkerThread());
1103: } else {
1104: if (maxThreads < 0) {
1105: curThreadsBusy++;
1106: return (newWorkerThread());
1107: } else {
1108: return (null);
1109: }
1110: }
1111: }
1112:
1113: }
1114:
1115: /**
1116: * Create and return a new processor suitable for processing HTTP
1117: * requests and returning the corresponding responses.
1118: */
1119: protected Worker newWorkerThread() {
1120:
1121: Worker workerThread = new Worker();
1122: workerThread.start();
1123: return (workerThread);
1124:
1125: }
1126:
1127: /**
1128: * Return a new worker thread, and block while to worker is available.
1129: */
1130: protected Worker getWorkerThread() {
1131: // Allocate a new worker thread
1132: Worker workerThread = createWorkerThread();
1133: while (workerThread == null) {
1134: try {
1135: synchronized (workers) {
1136: workers.wait();
1137: }
1138: } catch (InterruptedException e) {
1139: // Ignore
1140: }
1141: workerThread = createWorkerThread();
1142: }
1143: return workerThread;
1144: }
1145:
1146: /**
1147: * Recycle the specified Processor so that it can be used again.
1148: *
1149: * @param workerThread The processor to be recycled
1150: */
1151: protected void recycleWorkerThread(Worker workerThread) {
1152: synchronized (workers) {
1153: workers.push(workerThread);
1154: curThreadsBusy--;
1155: workers.notify();
1156: }
1157: }
1158:
1159: /**
1160: * Allocate a new poller of the specified size.
1161: */
1162: protected long allocatePoller(int size, long pool, int timeout) {
1163: try {
1164: return Poll.create(size, pool, 0, timeout * 1000);
1165: } catch (Error e) {
1166: if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
1167: log.info(sm.getString("endpoint.poll.limitedpollsize",
1168: "" + size));
1169: return 0;
1170: } else {
1171: log.error(sm.getString("endpoint.poll.initfail"), e);
1172: return -1;
1173: }
1174: }
1175: }
1176:
1177: /**
1178: * Process given socket.
1179: */
1180: protected boolean processSocketWithOptions(long socket) {
1181: try {
1182: if (executor == null) {
1183: getWorkerThread().assignWithOptions(socket);
1184: } else {
1185: executor
1186: .execute(new SocketWithOptionsProcessor(socket));
1187: }
1188: } catch (Throwable t) {
1189: // This means we got an OOM or similar creating a thread, or that
1190: // the pool and its queue are full
1191: log.error(sm.getString("endpoint.process.fail"), t);
1192: return false;
1193: }
1194: return true;
1195: }
1196:
1197: /**
1198: * Process given socket.
1199: */
1200: protected boolean processSocket(long socket) {
1201: try {
1202: if (executor == null) {
1203: getWorkerThread().assign(socket);
1204: } else {
1205: executor.execute(new SocketProcessor(socket));
1206: }
1207: } catch (Throwable t) {
1208: // This means we got an OOM or similar creating a thread, or that
1209: // the pool and its queue are full
1210: log.error(sm.getString("endpoint.process.fail"), t);
1211: return false;
1212: }
1213: return true;
1214: }
1215:
1216: /**
1217: * Process given socket for an event.
1218: */
1219: protected boolean processSocket(long socket, SocketStatus status) {
1220: try {
1221: if (executor == null) {
1222: getWorkerThread().assign(socket, status);
1223: } else {
1224: executor.execute(new SocketEventProcessor(socket,
1225: status));
1226: }
1227: } catch (Throwable t) {
1228: // This means we got an OOM or similar creating a thread, or that
1229: // the pool and its queue are full
1230: log.error(sm.getString("endpoint.process.fail"), t);
1231: return false;
1232: }
1233: return true;
1234: }
1235:
1236: // --------------------------------------------------- Acceptor Inner Class
1237:
1238: /**
1239: * Server socket acceptor thread.
1240: */
1241: protected class Acceptor implements Runnable {
1242:
1243: /**
1244: * The background thread that listens for incoming TCP/IP connections and
1245: * hands them off to an appropriate processor.
1246: */
1247: public void run() {
1248:
1249: // Loop until we receive a shutdown command
1250: while (running) {
1251:
1252: // Loop if endpoint is paused
1253: while (paused) {
1254: try {
1255: Thread.sleep(1000);
1256: } catch (InterruptedException e) {
1257: // Ignore
1258: }
1259: }
1260:
1261: try {
1262: // Accept the next incoming connection from the server socket
1263: long socket = Socket.accept(serverSock);
1264: // Hand this socket off to an appropriate processor
1265: if (!processSocketWithOptions(socket)) {
1266: // Close socket and pool right away
1267: Socket.destroy(socket);
1268: }
1269: } catch (Throwable t) {
1270: log.error(sm.getString("endpoint.accept.fail"), t);
1271: }
1272:
1273: // The processor will recycle itself when it finishes
1274:
1275: }
1276:
1277: }
1278:
1279: }
1280:
1281: // ----------------------------------------------------- Poller Inner Class
1282:
1283: /**
1284: * Poller class.
1285: */
1286: public class Poller implements Runnable {
1287:
1288: protected long serverPollset = 0;
1289: protected long pool = 0;
1290: protected long[] desc;
1291:
1292: protected long[] addS;
1293: protected int addCount = 0;
1294:
1295: protected boolean comet = true;
1296:
1297: protected int keepAliveCount = 0;
1298:
1299: public int getKeepAliveCount() {
1300: return keepAliveCount;
1301: }
1302:
1303: public Poller(boolean comet) {
1304: this .comet = comet;
1305: }
1306:
1307: /**
1308: * Create the poller. With some versions of APR, the maximum poller size will
1309: * be 62 (recompiling APR is necessary to remove this limitation).
1310: */
1311: protected void init() {
1312: pool = Pool.create(serverSockPool);
1313: int size = pollerSize / pollerThreadCount;
1314: int timeout = keepAliveTimeout;
1315: if (timeout < 0) {
1316: timeout = soTimeout;
1317: }
1318: serverPollset = allocatePoller(size, pool, timeout);
1319: if (serverPollset == 0 && size > 1024) {
1320: size = 1024;
1321: serverPollset = allocatePoller(size, pool, timeout);
1322: }
1323: if (serverPollset == 0) {
1324: size = 62;
1325: serverPollset = allocatePoller(size, pool, timeout);
1326: }
1327: desc = new long[size * 2];
1328: keepAliveCount = 0;
1329: addS = new long[size];
1330: addCount = 0;
1331: }
1332:
1333: /**
1334: * Destroy the poller.
1335: */
1336: protected void destroy() {
1337: // Wait for polltime before doing anything, so that the poller threads
1338: // exit, otherwise parallel descturction of sockets which are still
1339: // in the poller can cause problems
1340: try {
1341: synchronized (this ) {
1342: this .wait(pollTime / 1000);
1343: }
1344: } catch (InterruptedException e) {
1345: // Ignore
1346: }
1347: // Close all sockets in the add queue
1348: for (int i = 0; i < addCount; i++) {
1349: if (comet) {
1350: processSocket(addS[i], SocketStatus.STOP);
1351: } else {
1352: Socket.destroy(addS[i]);
1353: }
1354: }
1355: // Close all sockets still in the poller
1356: int rv = Poll.pollset(serverPollset, desc);
1357: if (rv > 0) {
1358: for (int n = 0; n < rv; n++) {
1359: if (comet) {
1360: processSocket(desc[n * 2 + 1],
1361: SocketStatus.STOP);
1362: } else {
1363: Socket.destroy(desc[n * 2 + 1]);
1364: }
1365: }
1366: }
1367: Pool.destroy(pool);
1368: keepAliveCount = 0;
1369: addCount = 0;
1370: }
1371:
1372: /**
1373: * Add specified socket and associated pool to the poller. The socket will
1374: * be added to a temporary array, and polled first after a maximum amount
1375: * of time equal to pollTime (in most cases, latency will be much lower,
1376: * however).
1377: *
1378: * @param socket to add to the poller
1379: */
1380: public void add(long socket) {
1381: synchronized (this ) {
1382: // Add socket to the list. Newly added sockets will wait
1383: // at most for pollTime before being polled
1384: if (addCount >= addS.length) {
1385: // Can't do anything: close the socket right away
1386: if (comet) {
1387: processSocket(socket, SocketStatus.ERROR);
1388: } else {
1389: Socket.destroy(socket);
1390: }
1391: return;
1392: }
1393: addS[addCount] = socket;
1394: addCount++;
1395: this .notify();
1396: }
1397: }
1398:
1399: /**
1400: * The background thread that listens for incoming TCP/IP connections and
1401: * hands them off to an appropriate processor.
1402: */
1403: public void run() {
1404:
1405: long maintainTime = 0;
1406: // Loop until we receive a shutdown command
1407: while (running) {
1408: // Loop if endpoint is paused
1409: while (paused) {
1410: try {
1411: Thread.sleep(1000);
1412: } catch (InterruptedException e) {
1413: // Ignore
1414: }
1415: }
1416:
1417: while (keepAliveCount < 1 && addCount < 1) {
1418: // Reset maintain time.
1419: maintainTime = 0;
1420: try {
1421: synchronized (this ) {
1422: this .wait();
1423: }
1424: } catch (InterruptedException e) {
1425: // Ignore
1426: }
1427: }
1428:
1429: try {
1430: // Add sockets which are waiting to the poller
1431: if (addCount > 0) {
1432: synchronized (this ) {
1433: for (int i = (addCount - 1); i >= 0; i--) {
1434: int rv = Poll.add(serverPollset,
1435: addS[i], Poll.APR_POLLIN);
1436: if (rv == Status.APR_SUCCESS) {
1437: keepAliveCount++;
1438: } else {
1439: // Can't do anything: close the socket right away
1440: if (comet) {
1441: processSocket(addS[i],
1442: SocketStatus.ERROR);
1443: } else {
1444: Socket.destroy(addS[i]);
1445: }
1446: }
1447: }
1448: addCount = 0;
1449: }
1450: }
1451:
1452: maintainTime += pollTime;
1453: // Pool for the specified interval
1454: int rv = Poll.poll(serverPollset, pollTime, desc,
1455: true);
1456: if (rv > 0) {
1457: keepAliveCount -= rv;
1458: for (int n = 0; n < rv; n++) {
1459: // Check for failed sockets and hand this socket off to a worker
1460: if (((desc[n * 2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1461: || ((desc[n * 2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
1462: || (comet && (!processSocket(
1463: desc[n * 2 + 1],
1464: SocketStatus.OPEN)))
1465: || (!comet && (!processSocket(desc[n * 2 + 1])))) {
1466: // Close socket and clear pool
1467: if (comet) {
1468: processSocket(desc[n * 2 + 1],
1469: SocketStatus.DISCONNECT);
1470: } else {
1471: Socket.destroy(desc[n * 2 + 1]);
1472: }
1473: continue;
1474: }
1475: }
1476: } else if (rv < 0) {
1477: int errn = -rv;
1478: /* Any non timeup or interrupted error is critical */
1479: if ((errn != Status.TIMEUP)
1480: && (errn != Status.EINTR)) {
1481: if (errn > Status.APR_OS_START_USERERR) {
1482: errn -= Status.APR_OS_START_USERERR;
1483: }
1484: log.error(sm.getString(
1485: "endpoint.poll.fail", "" + errn,
1486: Error.strerror(errn)));
1487: // Handle poll critical failure
1488: synchronized (this ) {
1489: destroy();
1490: init();
1491: }
1492: continue;
1493: }
1494: }
1495: if (soTimeout > 0 && maintainTime > 1000000L
1496: && running) {
1497: rv = Poll.maintain(serverPollset, desc, true);
1498: maintainTime = 0;
1499: if (rv > 0) {
1500: keepAliveCount -= rv;
1501: for (int n = 0; n < rv; n++) {
1502: // Close socket and clear pool
1503: if (comet) {
1504: processSocket(desc[n],
1505: SocketStatus.TIMEOUT);
1506: } else {
1507: Socket.destroy(desc[n]);
1508: }
1509: }
1510: }
1511: }
1512: } catch (Throwable t) {
1513: log.error(sm.getString("endpoint.poll.error"), t);
1514: }
1515:
1516: }
1517:
1518: synchronized (this ) {
1519: this .notifyAll();
1520: }
1521:
1522: }
1523:
1524: }
1525:
1526: // ----------------------------------------------------- Worker Inner Class
1527:
1528: /**
1529: * Server processor class.
1530: */
1531: protected class Worker implements Runnable {
1532:
1533: protected Thread thread = null;
1534: protected boolean available = false;
1535: protected long socket = 0;
1536: protected SocketStatus status = null;
1537: protected boolean options = false;
1538:
1539: /**
1540: * Process an incoming TCP/IP connection on the specified socket. Any
1541: * exception that occurs during processing must be logged and swallowed.
1542: * <b>NOTE</b>: This method is called from our Connector's thread. We
1543: * must assign it to our own thread so that multiple simultaneous
1544: * requests can be handled.
1545: *
1546: * @param socket TCP socket to process
1547: */
1548: protected synchronized void assignWithOptions(long socket) {
1549:
1550: // Wait for the Processor to get the previous Socket
1551: while (available) {
1552: try {
1553: wait();
1554: } catch (InterruptedException e) {
1555: }
1556: }
1557:
1558: // Store the newly available Socket and notify our thread
1559: this .socket = socket;
1560: status = null;
1561: options = true;
1562: available = true;
1563: notifyAll();
1564:
1565: }
1566:
1567: /**
1568: * Process an incoming TCP/IP connection on the specified socket. Any
1569: * exception that occurs during processing must be logged and swallowed.
1570: * <b>NOTE</b>: This method is called from our Connector's thread. We
1571: * must assign it to our own thread so that multiple simultaneous
1572: * requests can be handled.
1573: *
1574: * @param socket TCP socket to process
1575: */
1576: protected synchronized void assign(long socket) {
1577:
1578: // Wait for the Processor to get the previous Socket
1579: while (available) {
1580: try {
1581: wait();
1582: } catch (InterruptedException e) {
1583: }
1584: }
1585:
1586: // Store the newly available Socket and notify our thread
1587: this .socket = socket;
1588: status = null;
1589: options = false;
1590: available = true;
1591: notifyAll();
1592:
1593: }
1594:
1595: protected synchronized void assign(long socket,
1596: SocketStatus status) {
1597:
1598: // Wait for the Processor to get the previous Socket
1599: while (available) {
1600: try {
1601: wait();
1602: } catch (InterruptedException e) {
1603: }
1604: }
1605:
1606: // Store the newly available Socket and notify our thread
1607: this .socket = socket;
1608: this .status = status;
1609: options = false;
1610: available = true;
1611: notifyAll();
1612:
1613: }
1614:
1615: /**
1616: * Await a newly assigned Socket from our Connector, or <code>null</code>
1617: * if we are supposed to shut down.
1618: */
1619: protected synchronized long await() {
1620:
1621: // Wait for the Connector to provide a new Socket
1622: while (!available) {
1623: try {
1624: wait();
1625: } catch (InterruptedException e) {
1626: }
1627: }
1628:
1629: // Notify the Connector that we have received this Socket
1630: long socket = this .socket;
1631: available = false;
1632: notifyAll();
1633:
1634: return (socket);
1635:
1636: }
1637:
1638: /**
1639: * The background thread that listens for incoming TCP/IP connections and
1640: * hands them off to an appropriate processor.
1641: */
1642: public void run() {
1643:
1644: // Process requests until we receive a shutdown signal
1645: while (running) {
1646:
1647: // Wait for the next socket to be assigned
1648: long socket = await();
1649: if (socket == 0)
1650: continue;
1651:
1652: if (!deferAccept && options) {
1653: if (setSocketOptions(socket)) {
1654: getPoller().add(socket);
1655: } else {
1656: // Close socket and pool
1657: Socket.destroy(socket);
1658: socket = 0;
1659: }
1660: } else {
1661:
1662: // Process the request from this socket
1663: if ((status != null)
1664: && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
1665: // Close socket and pool
1666: Socket.destroy(socket);
1667: socket = 0;
1668: } else if ((status == null)
1669: && ((options && !setSocketOptions(socket)) || handler
1670: .process(socket) == Handler.SocketState.CLOSED)) {
1671: // Close socket and pool
1672: Socket.destroy(socket);
1673: socket = 0;
1674: }
1675: }
1676:
1677: // Finish up this request
1678: recycleWorkerThread(this );
1679:
1680: }
1681:
1682: }
1683:
1684: /**
1685: * Start the background processing thread.
1686: */
1687: public void start() {
1688: thread = new Thread(this );
1689: thread.setName(getName() + "-" + (++curThreads));
1690: thread.setDaemon(true);
1691: thread.start();
1692: }
1693:
1694: }
1695:
1696: // ----------------------------------------------- SendfileData Inner Class
1697:
1698: /**
1699: * SendfileData class.
1700: */
1701: public static class SendfileData {
1702: // File
1703: public String fileName;
1704: public long fd;
1705: public long fdpool;
1706: // Range information
1707: public long start;
1708: public long end;
1709: // Socket and socket pool
1710: public long socket;
1711: // Position
1712: public long pos;
1713: // KeepAlive flag
1714: public boolean keepAlive;
1715: }
1716:
1717: // --------------------------------------------------- Sendfile Inner Class
1718:
1719: /**
1720: * Sendfile class.
1721: */
1722: public class Sendfile implements Runnable {
1723:
1724: protected long sendfilePollset = 0;
1725: protected long pool = 0;
1726: protected long[] desc;
1727: protected HashMap<Long, SendfileData> sendfileData;
1728:
1729: protected int sendfileCount;
1730:
1731: public int getSendfileCount() {
1732: return sendfileCount;
1733: }
1734:
1735: protected ArrayList<SendfileData> addS;
1736:
1737: /**
1738: * Create the sendfile poller. With some versions of APR, the maximum poller size will
1739: * be 62 (reocmpiling APR is necessary to remove this limitation).
1740: */
1741: protected void init() {
1742: pool = Pool.create(serverSockPool);
1743: int size = sendfileSize / sendfileThreadCount;
1744: sendfilePollset = allocatePoller(size, pool, soTimeout);
1745: if (sendfilePollset == 0 && size > 1024) {
1746: size = 1024;
1747: sendfilePollset = allocatePoller(size, pool, soTimeout);
1748: }
1749: if (sendfilePollset == 0) {
1750: size = 62;
1751: sendfilePollset = allocatePoller(size, pool, soTimeout);
1752: }
1753: desc = new long[size * 2];
1754: sendfileData = new HashMap<Long, SendfileData>(size);
1755: addS = new ArrayList<SendfileData>();
1756: }
1757:
1758: /**
1759: * Destroy the poller.
1760: */
1761: protected void destroy() {
1762: // Wait for polltime before doing anything, so that the poller threads
1763: // exit, otherwise parallel descturction of sockets which are still
1764: // in the poller can cause problems
1765: try {
1766: synchronized (this ) {
1767: this .wait(pollTime / 1000);
1768: }
1769: } catch (InterruptedException e) {
1770: // Ignore
1771: }
1772: // Close any socket remaining in the add queue
1773: for (int i = (addS.size() - 1); i >= 0; i--) {
1774: SendfileData data = addS.get(i);
1775: Socket.destroy(data.socket);
1776: }
1777: // Close all sockets still in the poller
1778: int rv = Poll.pollset(sendfilePollset, desc);
1779: if (rv > 0) {
1780: for (int n = 0; n < rv; n++) {
1781: Socket.destroy(desc[n * 2 + 1]);
1782: }
1783: }
1784: Pool.destroy(pool);
1785: sendfileData.clear();
1786: }
1787:
1788: /**
1789: * Add the sendfile data to the sendfile poller. Note that in most cases,
1790: * the initial non blocking calls to sendfile will return right away, and
1791: * will be handled asynchronously inside the kernel. As a result,
1792: * the poller will never be used.
1793: *
1794: * @param data containing the reference to the data which should be snet
1795: * @return true if all the data has been sent right away, and false
1796: * otherwise
1797: */
1798: public boolean add(SendfileData data) {
1799: // Initialize fd from data given
1800: try {
1801: data.fdpool = Socket.pool(data.socket);
1802: data.fd = File.open(data.fileName, File.APR_FOPEN_READ
1803: | File.APR_FOPEN_SENDFILE_ENABLED
1804: | File.APR_FOPEN_BINARY, 0, data.fdpool);
1805: data.pos = data.start;
1806: // Set the socket to nonblocking mode
1807: Socket.timeoutSet(data.socket, 0);
1808: while (true) {
1809: long nw = Socket.sendfilen(data.socket, data.fd,
1810: data.pos, data.end - data.pos, 0);
1811: if (nw < 0) {
1812: if (!(-nw == Status.EAGAIN)) {
1813: Socket.destroy(data.socket);
1814: data.socket = 0;
1815: return false;
1816: } else {
1817: // Break the loop and add the socket to poller.
1818: break;
1819: }
1820: } else {
1821: data.pos = data.pos + nw;
1822: if (data.pos >= data.end) {
1823: // Entire file has been sent
1824: Pool.destroy(data.fdpool);
1825: // Set back socket to blocking mode
1826: Socket.timeoutSet(data.socket,
1827: soTimeout * 1000);
1828: return true;
1829: }
1830: }
1831: }
1832: } catch (Exception e) {
1833: log.error(sm.getString("endpoint.sendfile.error"), e);
1834: return false;
1835: }
1836: // Add socket to the list. Newly added sockets will wait
1837: // at most for pollTime before being polled
1838: synchronized (this ) {
1839: addS.add(data);
1840: this .notify();
1841: }
1842: return false;
1843: }
1844:
1845: /**
1846: * Remove socket from the poller.
1847: *
1848: * @param data the sendfile data which should be removed
1849: */
1850: protected void remove(SendfileData data) {
1851: int rv = Poll.remove(sendfilePollset, data.socket);
1852: if (rv == Status.APR_SUCCESS) {
1853: sendfileCount--;
1854: }
1855: sendfileData.remove(data);
1856: }
1857:
1858: /**
1859: * The background thread that listens for incoming TCP/IP connections and
1860: * hands them off to an appropriate processor.
1861: */
1862: public void run() {
1863:
1864: // Loop until we receive a shutdown command
1865: while (running) {
1866:
1867: // Loop if endpoint is paused
1868: while (paused) {
1869: try {
1870: Thread.sleep(1000);
1871: } catch (InterruptedException e) {
1872: // Ignore
1873: }
1874: }
1875:
1876: while (sendfileCount < 1 && addS.size() < 1) {
1877: try {
1878: synchronized (this ) {
1879: this .wait();
1880: }
1881: } catch (InterruptedException e) {
1882: // Ignore
1883: }
1884: }
1885:
1886: try {
1887: // Add socket to the poller
1888: if (addS.size() > 0) {
1889: synchronized (this ) {
1890: for (int i = (addS.size() - 1); i >= 0; i--) {
1891: SendfileData data = addS.get(i);
1892: int rv = Poll.add(sendfilePollset,
1893: data.socket, Poll.APR_POLLOUT);
1894: if (rv == Status.APR_SUCCESS) {
1895: sendfileData.put(new Long(
1896: data.socket), data);
1897: sendfileCount++;
1898: } else {
1899: log
1900: .warn(sm
1901: .getString(
1902: "endpoint.sendfile.addfail",
1903: "" + rv,
1904: Error
1905: .strerror(rv)));
1906: // Can't do anything: close the socket right away
1907: Socket.destroy(data.socket);
1908: }
1909: }
1910: addS.clear();
1911: }
1912: }
1913: // Pool for the specified interval
1914: int rv = Poll.poll(sendfilePollset, pollTime, desc,
1915: false);
1916: if (rv > 0) {
1917: for (int n = 0; n < rv; n++) {
1918: // Get the sendfile state
1919: SendfileData state = sendfileData
1920: .get(new Long(desc[n * 2 + 1]));
1921: // Problem events
1922: if (((desc[n * 2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1923: || ((desc[n * 2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
1924: // Close socket and clear pool
1925: remove(state);
1926: // Destroy file descriptor pool, which should close the file
1927: // Close the socket, as the reponse would be incomplete
1928: Socket.destroy(state.socket);
1929: continue;
1930: }
1931: // Write some data using sendfile
1932: long nw = Socket.sendfilen(state.socket,
1933: state.fd, state.pos, state.end
1934: - state.pos, 0);
1935: if (nw < 0) {
1936: // Close socket and clear pool
1937: remove(state);
1938: // Close the socket, as the reponse would be incomplete
1939: // This will close the file too.
1940: Socket.destroy(state.socket);
1941: continue;
1942: }
1943:
1944: state.pos = state.pos + nw;
1945: if (state.pos >= state.end) {
1946: remove(state);
1947: if (state.keepAlive) {
1948: // Destroy file descriptor pool, which should close the file
1949: Pool.destroy(state.fdpool);
1950: Socket.timeoutSet(state.socket,
1951: soTimeout * 1000);
1952: // If all done hand this socket off to a worker for
1953: // processing of further requests
1954: if (!processSocket(state.socket)) {
1955: Socket.destroy(state.socket);
1956: }
1957: } else {
1958: // Close the socket since this is
1959: // the end of not keep-alive request.
1960: Socket.destroy(state.socket);
1961: }
1962: }
1963: }
1964: } else if (rv < 0) {
1965: int errn = -rv;
1966: /* Any non timeup or interrupted error is critical */
1967: if ((errn != Status.TIMEUP)
1968: && (errn != Status.EINTR)) {
1969: if (errn > Status.APR_OS_START_USERERR) {
1970: errn -= Status.APR_OS_START_USERERR;
1971: }
1972: log.error(sm.getString(
1973: "endpoint.poll.fail", "" + errn,
1974: Error.strerror(errn)));
1975: // Handle poll critical failure
1976: synchronized (this ) {
1977: destroy();
1978: init();
1979: }
1980: continue;
1981: }
1982: }
1983: /* TODO: See if we need to call the maintain for sendfile poller */
1984: } catch (Throwable t) {
1985: log.error(sm.getString("endpoint.poll.error"), t);
1986: }
1987: }
1988:
1989: synchronized (this ) {
1990: this .notifyAll();
1991: }
1992:
1993: }
1994:
1995: }
1996:
1997: // ------------------------------------------------ Handler Inner Interface
1998:
1999: /**
2000: * Bare bones interface used for socket processing. Per thread data is to be
2001: * stored in the ThreadWithAttributes extra folders, or alternately in
2002: * thread local fields.
2003: */
2004: public interface Handler {
2005: public enum SocketState {
2006: OPEN, CLOSED, LONG
2007: }
2008:
2009: public SocketState process(long socket);
2010:
2011: public SocketState event(long socket, SocketStatus status);
2012: }
2013:
2014: // ------------------------------------------------- WorkerStack Inner Class
2015:
2016: public class WorkerStack {
2017:
2018: protected Worker[] workers = null;
2019: protected int end = 0;
2020:
2021: public WorkerStack(int size) {
2022: workers = new Worker[size];
2023: }
2024:
2025: /**
2026: * Put the object into the queue.
2027: *
2028: * @param object the object to be appended to the queue (first element).
2029: */
2030: public void push(Worker worker) {
2031: workers[end++] = worker;
2032: }
2033:
2034: /**
2035: * Get the first object out of the queue. Return null if the queue
2036: * is empty.
2037: */
2038: public Worker pop() {
2039: if (end > 0) {
2040: return workers[--end];
2041: }
2042: return null;
2043: }
2044:
2045: /**
2046: * Get the first object out of the queue, Return null if the queue
2047: * is empty.
2048: */
2049: public Worker peek() {
2050: return workers[end];
2051: }
2052:
2053: /**
2054: * Is the queue empty?
2055: */
2056: public boolean isEmpty() {
2057: return (end == 0);
2058: }
2059:
2060: /**
2061: * How many elements are there in this queue?
2062: */
2063: public int size() {
2064: return (end);
2065: }
2066: }
2067:
2068: // ---------------------------------------------- SocketProcessor Inner Class
2069:
2070: /**
2071: * This class is the equivalent of the Worker, but will simply use in an
2072: * external Executor thread pool. This will also set the socket options
2073: * and do the handshake.
2074: */
2075: protected class SocketWithOptionsProcessor implements Runnable {
2076:
2077: protected long socket = 0;
2078:
2079: public SocketWithOptionsProcessor(long socket) {
2080: this .socket = socket;
2081: }
2082:
2083: public void run() {
2084:
2085: if (!deferAccept) {
2086: if (setSocketOptions(socket)) {
2087: getPoller().add(socket);
2088: } else {
2089: // Close socket and pool
2090: Socket.destroy(socket);
2091: socket = 0;
2092: }
2093: } else {
2094: // Process the request from this socket
2095: if (!setSocketOptions(socket)
2096: || handler.process(socket) == Handler.SocketState.CLOSED) {
2097: // Close socket and pool
2098: Socket.destroy(socket);
2099: socket = 0;
2100: }
2101: }
2102:
2103: }
2104:
2105: }
2106:
2107: // ---------------------------------------------- SocketProcessor Inner Class
2108:
2109: /**
2110: * This class is the equivalent of the Worker, but will simply use in an
2111: * external Executor thread pool.
2112: */
2113: protected class SocketProcessor implements Runnable {
2114:
2115: protected long socket = 0;
2116:
2117: public SocketProcessor(long socket) {
2118: this .socket = socket;
2119: }
2120:
2121: public void run() {
2122:
2123: // Process the request from this socket
2124: if (handler.process(socket) == Handler.SocketState.CLOSED) {
2125: // Close socket and pool
2126: Socket.destroy(socket);
2127: socket = 0;
2128: }
2129:
2130: }
2131:
2132: }
2133:
2134: // --------------------------------------- SocketEventProcessor Inner Class
2135:
2136: /**
2137: * This class is the equivalent of the Worker, but will simply use in an
2138: * external Executor thread pool.
2139: */
2140: protected class SocketEventProcessor implements Runnable {
2141:
2142: protected long socket = 0;
2143: protected SocketStatus status = null;
2144:
2145: public SocketEventProcessor(long socket, SocketStatus status) {
2146: this .socket = socket;
2147: this .status = status;
2148: }
2149:
2150: public void run() {
2151:
2152: // Process the request from this socket
2153: if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
2154: // Close socket and pool
2155: Socket.destroy(socket);
2156: socket = 0;
2157: }
2158:
2159: }
2160:
2161: }
2162:
2163: }
|