0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017:
0018: package org.apache.tomcat.util.net;
0019:
0020: import java.io.File;
0021: import java.io.FileInputStream;
0022: import java.io.IOException;
0023: import java.net.InetAddress;
0024: import java.net.InetSocketAddress;
0025: import java.net.Socket;
0026: import java.nio.ByteBuffer;
0027: import java.nio.channels.CancelledKeyException;
0028: import java.nio.channels.FileChannel;
0029: import java.nio.channels.SelectionKey;
0030: import java.nio.channels.Selector;
0031: import java.nio.channels.ServerSocketChannel;
0032: import java.nio.channels.SocketChannel;
0033: import java.security.KeyStore;
0034: import java.util.Collection;
0035: import java.util.Iterator;
0036: import java.util.Set;
0037: import java.util.StringTokenizer;
0038: import java.util.concurrent.ConcurrentLinkedQueue;
0039: import java.util.concurrent.CountDownLatch;
0040: import java.util.concurrent.Executor;
0041: import java.util.concurrent.LinkedBlockingQueue;
0042: import java.util.concurrent.ThreadFactory;
0043: import java.util.concurrent.ThreadPoolExecutor;
0044: import java.util.concurrent.TimeUnit;
0045: import java.util.concurrent.atomic.AtomicInteger;
0046: import java.util.concurrent.atomic.AtomicLong;
0047: import javax.net.ssl.KeyManagerFactory;
0048: import javax.net.ssl.SSLContext;
0049: import javax.net.ssl.SSLEngine;
0050: import javax.net.ssl.TrustManagerFactory;
0051:
0052: import org.apache.juli.logging.Log;
0053: import org.apache.juli.logging.LogFactory;
0054: import org.apache.tomcat.util.IntrospectionUtils;
0055: import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
0056: import org.apache.tomcat.util.res.StringManager;
0057:
0058: /**
0059: * NIO tailored thread pool, providing the following services:
0060: * <ul>
0061: * <li>Socket acceptor thread</li>
0062: * <li>Socket poller thread</li>
0063: * <li>Worker threads pool</li>
0064: * </ul>
0065: *
0066: * When switching to Java 5, there's an opportunity to use the virtual
0067: * machine's thread pool.
0068: *
0069: * @author Mladen Turk
0070: * @author Remy Maucherat
0071: * @author Filip Hanik
0072: */
0073: public class NioEndpoint {
0074:
0075: // -------------------------------------------------------------- Constants
0076:
0077: protected static Log log = LogFactory.getLog(NioEndpoint.class);
0078:
0079: protected static StringManager sm = StringManager
0080: .getManager("org.apache.tomcat.util.net.res");
0081:
0082: /**
0083: * The Request attribute key for the cipher suite.
0084: */
0085: public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
0086:
0087: /**
0088: * The Request attribute key for the key size.
0089: */
0090: public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
0091:
0092: /**
0093: * The Request attribute key for the client certificate chain.
0094: */
0095: public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
0096:
0097: /**
0098: * The Request attribute key for the session id.
0099: * This one is a Tomcat extension to the Servlet spec.
0100: */
0101: public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
0102:
0103: public static final int OP_REGISTER = -1; //register interest op
0104:
0105: // ----------------------------------------------------------------- Fields
0106:
0107: /**
0108: * Available workers.
0109: */
0110: protected WorkerStack workers = null;
0111:
0112: /**
0113: * Running state of the endpoint.
0114: */
0115: protected volatile boolean running = false;
0116:
0117: /**
0118: * Will be set to true whenever the endpoint is paused.
0119: */
0120: protected volatile boolean paused = false;
0121:
0122: /**
0123: * Track the initialization state of the endpoint.
0124: */
0125: protected boolean initialized = false;
0126:
0127: /**
0128: * Current worker threads busy count.
0129: */
0130: protected int curThreadsBusy = 0;
0131:
0132: /**
0133: * Current worker threads count.
0134: */
0135: protected int curThreads = 0;
0136:
0137: /**
0138: * Sequence number used to generate thread names.
0139: */
0140: protected int sequence = 0;
0141:
0142: protected NioSelectorPool selectorPool = new NioSelectorPool();
0143:
0144: /**
0145: * Server socket "pointer".
0146: */
0147: protected ServerSocketChannel serverSock = null;
0148:
0149: /**
0150: * use send file
0151: */
0152: protected boolean useSendfile = true;
0153:
0154: /**
0155: * The size of the OOM parachute.
0156: */
0157: protected int oomParachute = 1024 * 1024;
0158: /**
0159: * The oom parachute, when an OOM error happens,
0160: * will release the data, giving the JVM instantly
0161: * a chunk of data to be able to recover with.
0162: */
0163: protected byte[] oomParachuteData = null;
0164:
0165: /**
0166: * Make sure this string has already been allocated
0167: */
0168: protected static final String oomParachuteMsg = "SEVERE:Memory usage is low, parachute is non existent, your system may start failing.";
0169:
0170: /**
0171: * Keep track of OOM warning messages.
0172: */
0173: long lastParachuteCheck = System.currentTimeMillis();
0174:
0175: /**
0176: * Cache for SocketProcessor objects
0177: */
0178: protected ConcurrentLinkedQueue<SocketProcessor> processorCache = new ConcurrentLinkedQueue<SocketProcessor>() {
0179: protected AtomicInteger size = new AtomicInteger(0);
0180:
0181: public boolean offer(SocketProcessor sc) {
0182: sc.reset(null, null);
0183: boolean offer = socketProperties.getProcessorCache() == -1 ? true
0184: : size.get() < socketProperties.getProcessorCache();
0185: //avoid over growing our cache or add after we have stopped
0186: if (running && (!paused) && (offer)) {
0187: boolean result = super .offer(sc);
0188: if (result) {
0189: size.incrementAndGet();
0190: }
0191: return result;
0192: } else
0193: return false;
0194: }
0195:
0196: public SocketProcessor poll() {
0197: SocketProcessor result = super .poll();
0198: if (result != null) {
0199: size.decrementAndGet();
0200: }
0201: return result;
0202: }
0203:
0204: public void clear() {
0205: super .clear();
0206: size.set(0);
0207: }
0208: };
0209:
0210: /**
0211: * Cache for key attachment objects
0212: */
0213: protected ConcurrentLinkedQueue<KeyAttachment> keyCache = new ConcurrentLinkedQueue<KeyAttachment>() {
0214: protected AtomicInteger size = new AtomicInteger(0);
0215:
0216: public boolean offer(KeyAttachment ka) {
0217: ka.reset();
0218: boolean offer = socketProperties.getKeyCache() == -1 ? true
0219: : size.get() < socketProperties.getKeyCache();
0220: //avoid over growing our cache or add after we have stopped
0221: if (running && (!paused) && (offer)) {
0222: boolean result = super .offer(ka);
0223: if (result) {
0224: size.incrementAndGet();
0225: }
0226: return result;
0227: } else
0228: return false;
0229: }
0230:
0231: public KeyAttachment poll() {
0232: KeyAttachment result = super .poll();
0233: if (result != null) {
0234: size.decrementAndGet();
0235: }
0236: return result;
0237: }
0238:
0239: public void clear() {
0240: super .clear();
0241: size.set(0);
0242: }
0243: };
0244:
0245: /**
0246: * Cache for poller events
0247: */
0248: protected ConcurrentLinkedQueue<PollerEvent> eventCache = new ConcurrentLinkedQueue<PollerEvent>() {
0249: protected AtomicInteger size = new AtomicInteger(0);
0250:
0251: public boolean offer(PollerEvent pe) {
0252: pe.reset();
0253: boolean offer = socketProperties.getEventCache() == -1 ? true
0254: : size.get() < socketProperties.getEventCache();
0255: //avoid over growing our cache or add after we have stopped
0256: if (running && (!paused) && (offer)) {
0257: boolean result = super .offer(pe);
0258: if (result) {
0259: size.incrementAndGet();
0260: }
0261: return result;
0262: } else
0263: return false;
0264: }
0265:
0266: public PollerEvent poll() {
0267: PollerEvent result = super .poll();
0268: if (result != null) {
0269: size.decrementAndGet();
0270: }
0271: return result;
0272: }
0273:
0274: public void clear() {
0275: super .clear();
0276: size.set(0);
0277: }
0278: };
0279:
0280: /**
0281: * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
0282: */
0283: protected ConcurrentLinkedQueue<NioChannel> nioChannels = new ConcurrentLinkedQueue<NioChannel>() {
0284: protected AtomicInteger size = new AtomicInteger(0);
0285: protected AtomicInteger bytes = new AtomicInteger(0);
0286:
0287: public boolean offer(NioChannel socket) {
0288: boolean offer = socketProperties.getBufferPool() == -1 ? true
0289: : size.get() < socketProperties.getBufferPool();
0290: offer = offer
0291: && (socketProperties.getBufferPoolSize() == -1 ? true
0292: : (bytes.get() + socket.getBufferSize()) < socketProperties
0293: .getBufferPoolSize());
0294: //avoid over growing our cache or add after we have stopped
0295: if (running && (!paused) && (offer)) {
0296: boolean result = super .offer(socket);
0297: if (result) {
0298: size.incrementAndGet();
0299: bytes.addAndGet(socket.getBufferSize());
0300: }
0301: return result;
0302: } else
0303: return false;
0304: }
0305:
0306: public NioChannel poll() {
0307: NioChannel result = super .poll();
0308: if (result != null) {
0309: size.decrementAndGet();
0310: bytes.addAndGet(-result.getBufferSize());
0311: }
0312: return result;
0313: }
0314:
0315: public void clear() {
0316: super .clear();
0317: size.set(0);
0318: bytes.set(0);
0319: }
0320: };
0321:
0322: // ------------------------------------------------------------- Properties
0323:
0324: /**
0325: * External Executor based thread pool.
0326: */
0327: protected Executor executor = null;
0328:
0329: public void setExecutor(Executor executor) {
0330: this .executor = executor;
0331: }
0332:
0333: public Executor getExecutor() {
0334: return executor;
0335: }
0336:
0337: protected boolean useExecutor = true;
0338:
0339: public void setUseExecutor(boolean useexec) {
0340: useExecutor = useexec;
0341: }
0342:
0343: public boolean getUseExecutor() {
0344: return useExecutor || (executor != null);
0345: }
0346:
0347: /**
0348: * Maximum amount of worker threads.
0349: */
0350: protected int maxThreads = 400;
0351:
0352: public void setMaxThreads(int maxThreads) {
0353: this .maxThreads = maxThreads;
0354: }
0355:
0356: public int getMaxThreads() {
0357: return maxThreads;
0358: }
0359:
0360: /**
0361: * Priority of the worker threads.
0362: */
0363: protected int threadPriority = Thread.NORM_PRIORITY;
0364:
0365: public void setThreadPriority(int threadPriority) {
0366: this .threadPriority = threadPriority;
0367: }
0368:
0369: public int getThreadPriority() {
0370: return threadPriority;
0371: }
0372:
0373: /**
0374: * Priority of the acceptor threads.
0375: */
0376: protected int acceptorThreadPriority = Thread.NORM_PRIORITY;
0377:
0378: public void setAcceptorThreadPriority(int acceptorThreadPriority) {
0379: this .acceptorThreadPriority = acceptorThreadPriority;
0380: }
0381:
0382: public int getAcceptorThreadPriority() {
0383: return acceptorThreadPriority;
0384: }
0385:
0386: /**
0387: * Priority of the poller threads.
0388: */
0389: protected int pollerThreadPriority = Thread.NORM_PRIORITY;
0390:
0391: public void setPollerThreadPriority(int pollerThreadPriority) {
0392: this .pollerThreadPriority = pollerThreadPriority;
0393: }
0394:
0395: public int getPollerThreadPriority() {
0396: return pollerThreadPriority;
0397: }
0398:
0399: /**
0400: * Server socket port.
0401: */
0402: protected int port;
0403:
0404: public int getPort() {
0405: return port;
0406: }
0407:
0408: public void setPort(int port) {
0409: this .port = port;
0410: }
0411:
0412: /**
0413: * Address for the server socket.
0414: */
0415: protected InetAddress address;
0416:
0417: public InetAddress getAddress() {
0418: return address;
0419: }
0420:
0421: public void setAddress(InetAddress address) {
0422: this .address = address;
0423: }
0424:
0425: /**
0426: * Handling of accepted sockets.
0427: */
0428: protected Handler handler = null;
0429:
0430: public void setHandler(Handler handler) {
0431: this .handler = handler;
0432: }
0433:
0434: public Handler getHandler() {
0435: return handler;
0436: }
0437:
0438: /**
0439: * Allows the server developer to specify the backlog that
0440: * should be used for server sockets. By default, this value
0441: * is 100.
0442: */
0443: protected int backlog = 100;
0444:
0445: public void setBacklog(int backlog) {
0446: if (backlog > 0)
0447: this .backlog = backlog;
0448: }
0449:
0450: public int getBacklog() {
0451: return backlog;
0452: }
0453:
0454: protected SocketProperties socketProperties = new SocketProperties();
0455:
0456: /**
0457: * Socket TCP no delay.
0458: */
0459: public boolean getTcpNoDelay() {
0460: return socketProperties.getTcpNoDelay();
0461: }
0462:
0463: public void setTcpNoDelay(boolean tcpNoDelay) {
0464: socketProperties.setTcpNoDelay(tcpNoDelay);
0465: }
0466:
0467: /**
0468: * Socket linger.
0469: */
0470: public int getSoLinger() {
0471: return socketProperties.getSoLingerTime();
0472: }
0473:
0474: public void setSoLinger(int soLinger) {
0475: socketProperties.setSoLingerTime(soLinger);
0476: socketProperties.setSoLingerOn(soLinger >= 0);
0477: }
0478:
0479: /**
0480: * Socket timeout.
0481: */
0482: public int getSoTimeout() {
0483: return socketProperties.getSoTimeout();
0484: }
0485:
0486: public void setSoTimeout(int soTimeout) {
0487: socketProperties.setSoTimeout(soTimeout);
0488: }
0489:
0490: /**
0491: * The default is true - the created threads will be
0492: * in daemon mode. If set to false, the control thread
0493: * will not be daemon - and will keep the process alive.
0494: */
0495: protected boolean daemon = true;
0496:
0497: public void setDaemon(boolean b) {
0498: daemon = b;
0499: }
0500:
0501: public boolean getDaemon() {
0502: return daemon;
0503: }
0504:
0505: /**
0506: * Name of the thread pool, which will be used for naming child threads.
0507: */
0508: protected String name = "TP";
0509:
0510: public void setName(String name) {
0511: this .name = name;
0512: }
0513:
0514: public String getName() {
0515: return name;
0516: }
0517:
0518: /**
0519: * Allow comet request handling.
0520: */
0521: protected boolean useComet = true;
0522:
0523: public void setUseComet(boolean useComet) {
0524: this .useComet = useComet;
0525: }
0526:
0527: public boolean getUseComet() {
0528: return useComet;
0529: }
0530:
0531: /**
0532: * Acceptor thread count.
0533: */
0534: protected int acceptorThreadCount = 1;
0535:
0536: public void setAcceptorThreadCount(int acceptorThreadCount) {
0537: this .acceptorThreadCount = acceptorThreadCount;
0538: }
0539:
0540: public int getAcceptorThreadCount() {
0541: return acceptorThreadCount;
0542: }
0543:
0544: /**
0545: * Poller thread count.
0546: */
0547: protected int pollerThreadCount = 1;
0548:
0549: public void setPollerThreadCount(int pollerThreadCount) {
0550: this .pollerThreadCount = pollerThreadCount;
0551: }
0552:
0553: public int getPollerThreadCount() {
0554: return pollerThreadCount;
0555: }
0556:
0557: protected long selectorTimeout = 1000;
0558:
0559: public void setSelectorTimeout(long timeout) {
0560: this .selectorTimeout = timeout;
0561: }
0562:
0563: public long getSelectorTimeout() {
0564: return this .selectorTimeout;
0565: }
0566:
0567: /**
0568: * The socket poller.
0569: */
0570: protected Poller[] pollers = null;
0571: protected int pollerRoundRobin = 0;
0572:
0573: public Poller getPoller0() {
0574: pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
0575: Poller poller = pollers[pollerRoundRobin];
0576: return poller;
0577: }
0578:
0579: /**
0580: * The socket poller used for Comet support.
0581: */
0582: public Poller getCometPoller0() {
0583: Poller poller = getPoller0();
0584: return poller;
0585: }
0586:
0587: /**
0588: * Dummy maxSpareThreads property.
0589: */
0590: public int getMaxSpareThreads() {
0591: return Math.min(getMaxThreads(), 5);
0592: }
0593:
0594: /**
0595: * Dummy minSpareThreads property.
0596: */
0597: public int getMinSpareThreads() {
0598: return Math.min(getMaxThreads(), 5);
0599: }
0600:
0601: /**
0602: * Generic properties, introspected
0603: */
0604: public void setProperty(String name, String value) {
0605: final String selectorPoolName = "selectorPool.";
0606: final String socketName = "socket.";
0607: try {
0608: if (name.startsWith(selectorPoolName)) {
0609: IntrospectionUtils.setProperty(selectorPool, name
0610: .substring(selectorPoolName.length()), value);
0611: } else if (name.startsWith(socketName)) {
0612: IntrospectionUtils.setProperty(socketProperties, name
0613: .substring(socketName.length()), value);
0614: } else {
0615: IntrospectionUtils.setProperty(this , name, value);
0616: }
0617: } catch (Exception x) {
0618: log.error("Unable to set attribute \"" + name + "\" to \""
0619: + value + "\"", x);
0620: }
0621: }
0622:
0623: // -------------------- SSL related properties --------------------
0624: protected String keystoreFile = System.getProperty("user.home")
0625: + "/.keystore";
0626:
0627: public String getKeystoreFile() {
0628: return keystoreFile;
0629: }
0630:
0631: public void setKeystoreFile(String s) {
0632: this .keystoreFile = s;
0633: }
0634:
0635: public void setKeystore(String s) {
0636: setKeystoreFile(s);
0637: }
0638:
0639: public String getKeystore() {
0640: return getKeystoreFile();
0641: }
0642:
0643: protected String algorithm = "SunX509";
0644:
0645: public String getAlgorithm() {
0646: return algorithm;
0647: }
0648:
0649: public void setAlgorithm(String s) {
0650: this .algorithm = s;
0651: }
0652:
0653: protected boolean clientAuth = false;
0654:
0655: public boolean getClientAuth() {
0656: return clientAuth;
0657: }
0658:
0659: public void setClientAuth(boolean b) {
0660: this .clientAuth = b;
0661: }
0662:
0663: protected String keystorePass = "changeit";
0664:
0665: public String getKeystorePass() {
0666: return keystorePass;
0667: }
0668:
0669: public void setKeystorePass(String s) {
0670: this .keystorePass = s;
0671: }
0672:
0673: protected String keystoreType = "JKS";
0674:
0675: public String getKeystoreType() {
0676: return keystoreType;
0677: }
0678:
0679: public void setKeystoreType(String s) {
0680: this .keystoreType = s;
0681: }
0682:
0683: protected String sslProtocol = "TLS";
0684:
0685: public String getSslProtocol() {
0686: return sslProtocol;
0687: }
0688:
0689: public void setSslProtocol(String s) {
0690: sslProtocol = s;
0691: }
0692:
0693: protected String sslEnabledProtocols = null; //"TLSv1,SSLv3,SSLv2Hello"
0694: protected String[] sslEnabledProtocolsarr = new String[0];
0695:
0696: public void setSslEnabledProtocols(String s) {
0697: this .sslEnabledProtocols = s;
0698: StringTokenizer t = new StringTokenizer(s, ",");
0699: sslEnabledProtocolsarr = new String[t.countTokens()];
0700: for (int i = 0; i < sslEnabledProtocolsarr.length; i++)
0701: sslEnabledProtocolsarr[i] = t.nextToken();
0702: }
0703:
0704: protected String ciphers = null;
0705: protected String[] ciphersarr = new String[0];
0706:
0707: public String getCiphers() {
0708: return ciphers;
0709: }
0710:
0711: public void setCiphers(String s) {
0712: ciphers = s;
0713: if (s == null)
0714: ciphersarr = new String[0];
0715: else {
0716: StringTokenizer t = new StringTokenizer(s, ",");
0717: ciphersarr = new String[t.countTokens()];
0718: for (int i = 0; i < ciphersarr.length; i++)
0719: ciphersarr[i] = t.nextToken();
0720: }
0721: }
0722:
0723: /**
0724: * SSL engine.
0725: */
0726: protected boolean SSLEnabled = false;
0727:
0728: public boolean isSSLEnabled() {
0729: return SSLEnabled;
0730: }
0731:
0732: public void setSSLEnabled(boolean SSLEnabled) {
0733: this .SSLEnabled = SSLEnabled;
0734: }
0735:
0736: protected boolean secure = false;
0737:
0738: public boolean getSecure() {
0739: return secure;
0740: }
0741:
0742: public void setSecure(boolean b) {
0743: secure = b;
0744: }
0745:
0746: public void setSelectorPool(NioSelectorPool selectorPool) {
0747: this .selectorPool = selectorPool;
0748: }
0749:
0750: public void setSocketProperties(SocketProperties socketProperties) {
0751: this .socketProperties = socketProperties;
0752: }
0753:
0754: public void setUseSendfile(boolean useSendfile) {
0755:
0756: this .useSendfile = useSendfile;
0757: }
0758:
0759: public void setOomParachute(int oomParachute) {
0760: this .oomParachute = oomParachute;
0761: }
0762:
0763: public void setOomParachuteData(byte[] oomParachuteData) {
0764: this .oomParachuteData = oomParachuteData;
0765: }
0766:
0767: protected SSLContext sslContext = null;
0768:
0769: public SSLContext getSSLContext() {
0770: return sslContext;
0771: }
0772:
0773: public void setSSLContext(SSLContext c) {
0774: sslContext = c;
0775: }
0776:
0777: // --------------------------------------------------------- OOM Parachute Methods
0778:
0779: protected void checkParachute() {
0780: boolean para = reclaimParachute(false);
0781: if (!para
0782: && (System.currentTimeMillis() - lastParachuteCheck) > 10000) {
0783: try {
0784: log.fatal(oomParachuteMsg);
0785: } catch (Throwable t) {
0786: System.err.println(oomParachuteMsg);
0787: }
0788: lastParachuteCheck = System.currentTimeMillis();
0789: }
0790: }
0791:
0792: protected boolean reclaimParachute(boolean force) {
0793: if (oomParachuteData != null)
0794: return true;
0795: if (oomParachute > 0
0796: && (force || (Runtime.getRuntime().freeMemory() > (oomParachute * 2))))
0797: oomParachuteData = new byte[oomParachute];
0798: return oomParachuteData != null;
0799: }
0800:
0801: protected void releaseCaches() {
0802: this .keyCache.clear();
0803: this .nioChannels.clear();
0804: this .processorCache.clear();
0805: if (handler != null)
0806: handler.releaseCaches();
0807:
0808: }
0809:
0810: // --------------------------------------------------------- Public Methods
0811: /**
0812: * Number of keepalive sockets.
0813: */
0814: public int getKeepAliveCount() {
0815: if (pollers == null) {
0816: return 0;
0817: } else {
0818: int keepAliveCount = 0;
0819: for (int i = 0; i < pollers.length; i++) {
0820: keepAliveCount += pollers[i].getKeepAliveCount();
0821: }
0822: return keepAliveCount;
0823: }
0824: }
0825:
0826: /**
0827: * Return the amount of threads that are managed by the pool.
0828: *
0829: * @return the amount of threads that are managed by the pool
0830: */
0831: public int getCurrentThreadCount() {
0832: return curThreads;
0833: }
0834:
0835: /**
0836: * Return the amount of threads currently busy.
0837: *
0838: * @return the amount of threads currently busy
0839: */
0840: public int getCurrentThreadsBusy() {
0841: return curThreadsBusy;
0842: }
0843:
0844: /**
0845: * Return the state of the endpoint.
0846: *
0847: * @return true if the endpoint is running, false otherwise
0848: */
0849: public boolean isRunning() {
0850: return running;
0851: }
0852:
0853: /**
0854: * Return the state of the endpoint.
0855: *
0856: * @return true if the endpoint is paused, false otherwise
0857: */
0858: public boolean isPaused() {
0859: return paused;
0860: }
0861:
0862: // ----------------------------------------------- Public Lifecycle Methods
0863:
0864: /**
0865: * Initialize the endpoint.
0866: */
0867: public void init() throws Exception {
0868:
0869: if (initialized)
0870: return;
0871:
0872: serverSock = ServerSocketChannel.open();
0873: serverSock.socket().setPerformancePreferences(
0874: socketProperties.getPerformanceConnectionTime(),
0875: socketProperties.getPerformanceLatency(),
0876: socketProperties.getPerformanceBandwidth());
0877: InetSocketAddress addr = (address != null ? new InetSocketAddress(
0878: address, port)
0879: : new InetSocketAddress(port));
0880: serverSock.socket().bind(addr, backlog);
0881: serverSock.configureBlocking(true); //mimic APR behavior
0882:
0883: // Initialize thread count defaults for acceptor, poller
0884: if (acceptorThreadCount == 0) {
0885: // FIXME: Doesn't seem to work that well with multiple accept threads
0886: acceptorThreadCount = 1;
0887: }
0888: if (pollerThreadCount <= 0) {
0889: //minimum one poller thread
0890: pollerThreadCount = 1;
0891: }
0892:
0893: // Initialize SSL if needed
0894: if (isSSLEnabled()) {
0895: // Initialize SSL
0896: char[] passphrase = getKeystorePass().toCharArray();
0897:
0898: KeyStore ks = KeyStore.getInstance(getKeystoreType());
0899: ks.load(new FileInputStream(getKeystoreFile()), passphrase);
0900: KeyStore ts = KeyStore.getInstance(getKeystoreType());
0901: ts.load(new FileInputStream(getKeystoreFile()), passphrase);
0902:
0903: KeyManagerFactory kmf = KeyManagerFactory
0904: .getInstance(getAlgorithm());
0905: kmf.init(ks, passphrase);
0906:
0907: TrustManagerFactory tmf = TrustManagerFactory
0908: .getInstance(getAlgorithm());
0909: tmf.init(ts);
0910:
0911: sslContext = SSLContext.getInstance(getSslProtocol());
0912: sslContext.init(kmf.getKeyManagers(), tmf
0913: .getTrustManagers(), null);
0914:
0915: }
0916:
0917: if (oomParachute > 0)
0918: reclaimParachute(true);
0919:
0920: initialized = true;
0921:
0922: }
0923:
0924: /**
0925: * Start the NIO endpoint, creating acceptor, poller threads.
0926: */
0927: public void start() throws Exception {
0928: // Initialize socket if not done before
0929: if (!initialized) {
0930: init();
0931: }
0932: if (!running) {
0933: running = true;
0934: paused = false;
0935:
0936: // Create worker collection
0937: if (getUseExecutor()) {
0938: if (executor == null) {
0939: TaskQueue taskqueue = new TaskQueue();
0940: TaskThreadFactory tf = new TaskThreadFactory(
0941: getName() + "-exec-");
0942: executor = new ThreadPoolExecutor(
0943: getMinSpareThreads(), getMaxThreads(), 60,
0944: TimeUnit.SECONDS, taskqueue, tf);
0945: taskqueue.setParent((ThreadPoolExecutor) executor);
0946: }
0947: } else if (executor == null) {//avoid two thread pools being created
0948: workers = new WorkerStack(maxThreads);
0949: }
0950:
0951: // Start acceptor threads
0952: for (int i = 0; i < acceptorThreadCount; i++) {
0953: Thread acceptorThread = new Thread(new Acceptor(),
0954: getName() + "-Acceptor-" + i);
0955: acceptorThread.setPriority(threadPriority);
0956: acceptorThread.setDaemon(daemon);
0957: acceptorThread.start();
0958: }
0959:
0960: // Start poller threads
0961: pollers = new Poller[pollerThreadCount];
0962: for (int i = 0; i < pollerThreadCount; i++) {
0963: pollers[i] = new Poller();
0964: pollers[i].init();
0965: Thread pollerThread = new Thread(pollers[i], getName()
0966: + "-Poller-" + i);
0967: pollerThread.setPriority(threadPriority);
0968: pollerThread.setDaemon(true);
0969: pollerThread.start();
0970: }
0971: }
0972: }
0973:
0974: /**
0975: * Pause the endpoint, which will make it stop accepting new sockets.
0976: */
0977: public void pause() {
0978: if (running && !paused) {
0979: paused = true;
0980: unlockAccept();
0981: }
0982: }
0983:
0984: /**
0985: * Resume the endpoint, which will make it start accepting new sockets
0986: * again.
0987: */
0988: public void resume() {
0989: if (running) {
0990: paused = false;
0991: }
0992: }
0993:
0994: /**
0995: * Stop the endpoint. This will cause all processing threads to stop.
0996: */
0997: public void stop() {
0998: if (running) {
0999: running = false;
1000: unlockAccept();
1001: for (int i = 0; i < pollers.length; i++) {
1002: pollers[i].destroy();
1003: }
1004: pollers = null;
1005: }
1006: eventCache.clear();
1007: keyCache.clear();
1008: nioChannels.clear();
1009: processorCache.clear();
1010: if (executor != null) {
1011: if (executor instanceof ThreadPoolExecutor) {
1012: //this is our internal one, so we need to shut it down
1013: ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
1014: tpe.shutdown();
1015: TaskQueue queue = (TaskQueue) tpe.getQueue();
1016: queue.setParent(null);
1017: }
1018: executor = null;
1019: }
1020: }
1021:
1022: /**
1023: * Deallocate NIO memory pools, and close server socket.
1024: */
1025: public void destroy() throws Exception {
1026: if (running) {
1027: stop();
1028: }
1029: // Close server socket
1030: serverSock.socket().close();
1031: serverSock.close();
1032: serverSock = null;
1033: sslContext = null;
1034: initialized = false;
1035: releaseCaches();
1036: }
1037:
1038: // ------------------------------------------------------ Protected Methods
1039:
1040: /**
1041: * Get a sequence number used for thread naming.
1042: */
1043: protected int getSequence() {
1044: return sequence++;
1045: }
1046:
1047: public int getWriteBufSize() {
1048: return socketProperties.getTxBufSize();
1049: }
1050:
1051: public int getReadBufSize() {
1052: return socketProperties.getRxBufSize();
1053: }
1054:
1055: public NioSelectorPool getSelectorPool() {
1056: return selectorPool;
1057: }
1058:
1059: public SocketProperties getSocketProperties() {
1060: return socketProperties;
1061: }
1062:
1063: public boolean getUseSendfile() {
1064: //send file doesn't work with SSL
1065: return useSendfile && (!isSSLEnabled());
1066: }
1067:
1068: public int getOomParachute() {
1069: return oomParachute;
1070: }
1071:
1072: public byte[] getOomParachuteData() {
1073: return oomParachuteData;
1074: }
1075:
1076: /**
1077: * Unlock the server socket accept using a bogus connection.
1078: */
1079: protected void unlockAccept() {
1080: java.net.Socket s = null;
1081: try {
1082: // Need to create a connection to unlock the accept();
1083: if (address == null) {
1084: s = new java.net.Socket("127.0.0.1", port);
1085: } else {
1086: s = new java.net.Socket(address, port);
1087: // setting soLinger to a small value will help shutdown the
1088: // connection quicker
1089: s.setSoLinger(true, 0);
1090: }
1091: } catch (Exception e) {
1092: if (log.isDebugEnabled()) {
1093: log.debug(sm.getString("endpoint.debug.unlock", ""
1094: + port), e);
1095: }
1096: } finally {
1097: if (s != null) {
1098: try {
1099: s.close();
1100: } catch (Exception e) {
1101: // Ignore
1102: }
1103: }
1104: }
1105: }
1106:
1107: /**
1108: * Process the specified connection.
1109: */
1110: protected boolean setSocketOptions(SocketChannel socket) {
1111: // Process the connection
1112: try {
1113: //disable blocking, APR style, we are gonna be polling it
1114: socket.configureBlocking(false);
1115: Socket sock = socket.socket();
1116: socketProperties.setProperties(sock);
1117:
1118: NioChannel channel = nioChannels.poll();
1119: if (channel == null) {
1120: // SSL setup
1121: if (sslContext != null) {
1122: SSLEngine engine = createSSLEngine();
1123: int appbufsize = engine.getSession()
1124: .getApplicationBufferSize();
1125: NioBufferHandler bufhandler = new NioBufferHandler(
1126: Math.max(appbufsize, socketProperties
1127: .getAppReadBufSize()), Math.max(
1128: appbufsize, socketProperties
1129: .getAppWriteBufSize()),
1130: socketProperties.getDirectBuffer());
1131: channel = new SecureNioChannel(socket, engine,
1132: bufhandler, selectorPool);
1133: } else {
1134: // normal tcp setup
1135: NioBufferHandler bufhandler = new NioBufferHandler(
1136: socketProperties.getAppReadBufSize(),
1137: socketProperties.getAppWriteBufSize(),
1138: socketProperties.getDirectBuffer());
1139:
1140: channel = new NioChannel(socket, bufhandler);
1141: }
1142: } else {
1143: channel.setIOChannel(socket);
1144: if (channel instanceof SecureNioChannel) {
1145: SSLEngine engine = createSSLEngine();
1146: ((SecureNioChannel) channel).reset(engine);
1147: } else {
1148: channel.reset();
1149: }
1150: }
1151: getPoller0().register(channel);
1152: } catch (Throwable t) {
1153: try {
1154: log.error("", t);
1155: } catch (Throwable tt) {
1156: }
1157: // Tell to close the socket
1158: return false;
1159: }
1160: return true;
1161: }
1162:
1163: protected SSLEngine createSSLEngine() {
1164: SSLEngine engine = sslContext.createSSLEngine();
1165: engine.setNeedClientAuth(getClientAuth());
1166: engine.setUseClientMode(false);
1167: if (ciphersarr.length > 0)
1168: engine.setEnabledCipherSuites(ciphersarr);
1169: if (sslEnabledProtocolsarr.length > 0)
1170: engine.setEnabledProtocols(sslEnabledProtocolsarr);
1171:
1172: return engine;
1173: }
1174:
1175: /**
1176: * Returns true if a worker thread is available for processing.
1177: * @return boolean
1178: */
1179: protected boolean isWorkerAvailable() {
1180: if (executor != null) {
1181: return true;
1182: } else {
1183: if (workers.size() > 0) {
1184: return true;
1185: }
1186: if ((maxThreads > 0) && (curThreads < maxThreads)) {
1187: return true;
1188: } else {
1189: if (maxThreads < 0) {
1190: return true;
1191: } else {
1192: return false;
1193: }
1194: }
1195: }
1196: }
1197:
1198: /**
1199: * Create (or allocate) and return an available processor for use in
1200: * processing a specific HTTP request, if possible. If the maximum
1201: * allowed processors have already been created and are in use, return
1202: * <code>null</code> instead.
1203: */
1204: protected Worker createWorkerThread() {
1205:
1206: synchronized (workers) {
1207: if (workers.size() > 0) {
1208: curThreadsBusy++;
1209: return (workers.pop());
1210: }
1211: if ((maxThreads > 0) && (curThreads < maxThreads)) {
1212: curThreadsBusy++;
1213: return (newWorkerThread());
1214: } else {
1215: if (maxThreads < 0) {
1216: curThreadsBusy++;
1217: return (newWorkerThread());
1218: } else {
1219: return (null);
1220: }
1221: }
1222: }
1223: }
1224:
1225: /**
1226: * Create and return a new processor suitable for processing HTTP
1227: * requests and returning the corresponding responses.
1228: */
1229: protected Worker newWorkerThread() {
1230:
1231: Worker workerThread = new Worker();
1232: workerThread.start();
1233: return (workerThread);
1234:
1235: }
1236:
1237: /**
1238: * Return a new worker thread, and block while to worker is available.
1239: */
1240: protected Worker getWorkerThread() {
1241: // Allocate a new worker thread
1242: Worker workerThread = createWorkerThread();
1243: while (workerThread == null) {
1244: try {
1245: synchronized (workers) {
1246: workerThread = createWorkerThread();
1247: if (workerThread == null)
1248: workers.wait();
1249: }
1250: } catch (InterruptedException e) {
1251: // Ignore
1252: }
1253: if (workerThread == null)
1254: workerThread = createWorkerThread();
1255: }
1256: return workerThread;
1257: }
1258:
1259: /**
1260: * Recycle the specified Processor so that it can be used again.
1261: *
1262: * @param workerThread The processor to be recycled
1263: */
1264: protected void recycleWorkerThread(Worker workerThread) {
1265: synchronized (workers) {
1266: workers.push(workerThread);
1267: curThreadsBusy--;
1268: workers.notify();
1269: }
1270: }
1271:
1272: /**
1273: * Process given socket.
1274: */
1275: protected boolean processSocket(NioChannel socket) {
1276: return processSocket(socket, null);
1277: }
1278:
1279: /**
1280: * Process given socket for an event.
1281: */
1282: protected boolean processSocket(NioChannel socket,
1283: SocketStatus status) {
1284: return processSocket(socket, status, true);
1285: }
1286:
1287: protected boolean processSocket(NioChannel socket,
1288: SocketStatus status, boolean dispatch) {
1289: try {
1290: if (executor == null) {
1291: getWorkerThread().assign(socket, status);
1292: } else {
1293: SocketProcessor sc = processorCache.poll();
1294: if (sc == null)
1295: sc = new SocketProcessor(socket, status);
1296: else
1297: sc.reset(socket, status);
1298: if (dispatch)
1299: executor.execute(sc);
1300: else
1301: sc.run();
1302: }
1303: } catch (Throwable t) {
1304: // This means we got an OOM or similar creating a thread, or that
1305: // the pool and its queue are full
1306: log.error(sm.getString("endpoint.process.fail"), t);
1307: return false;
1308: }
1309: return true;
1310: }
1311:
1312: // --------------------------------------------------- Acceptor Inner Class
1313:
1314: /**
1315: * Server socket acceptor thread.
1316: */
1317: protected class Acceptor implements Runnable {
1318: /**
1319: * The background thread that listens for incoming TCP/IP connections and
1320: * hands them off to an appropriate processor.
1321: */
1322: public void run() {
1323: // Loop until we receive a shutdown command
1324: while (running) {
1325: // Loop if endpoint is paused
1326: while (paused) {
1327: try {
1328: Thread.sleep(1000);
1329: } catch (InterruptedException e) {
1330: // Ignore
1331: }
1332: }
1333: try {
1334: // Accept the next incoming connection from the server socket
1335: SocketChannel socket = serverSock.accept();
1336: // Hand this socket off to an appropriate processor
1337: //TODO FIXME - this is currently a blocking call, meaning we will be blocking
1338: //further accepts until there is a thread available.
1339: if (running && (!paused) && socket != null) {
1340: //processSocket(socket);
1341: if (!setSocketOptions(socket)) {
1342: try {
1343: socket.socket().close();
1344: socket.close();
1345: } catch (IOException ix) {
1346: if (log.isDebugEnabled())
1347: log.debug("", ix);
1348: }
1349: }
1350: }
1351: } catch (IOException x) {
1352: if (running)
1353: log.error(sm.getString("endpoint.accept.fail"),
1354: x);
1355: } catch (OutOfMemoryError oom) {
1356: try {
1357: oomParachuteData = null;
1358: releaseCaches();
1359: log.error("", oom);
1360: } catch (Throwable oomt) {
1361: try {
1362: try {
1363: System.err.println(oomParachuteMsg);
1364: oomt.printStackTrace();
1365: } catch (Throwable letsHopeWeDontGetHere) {
1366: }
1367: } catch (Throwable letsHopeWeDontGetHere) {
1368: }
1369: }
1370: } catch (Throwable t) {
1371: log.error(sm.getString("endpoint.accept.fail"), t);
1372: }
1373: }//while
1374: }//run
1375: }
1376:
1377: // ----------------------------------------------------- Poller Inner Classes
1378:
1379: /**
1380: *
1381: * PollerEvent, cacheable object for poller events to avoid GC
1382: */
1383: public class PollerEvent implements Runnable {
1384:
1385: protected NioChannel socket;
1386: protected int interestOps;
1387: protected KeyAttachment key;
1388:
1389: public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
1390: reset(ch, k, intOps);
1391: }
1392:
1393: public void reset(NioChannel ch, KeyAttachment k, int intOps) {
1394: socket = ch;
1395: interestOps = intOps;
1396: key = k;
1397: }
1398:
1399: public void reset() {
1400: reset(null, null, 0);
1401: }
1402:
1403: public void run() {
1404: if (interestOps == OP_REGISTER) {
1405: try {
1406: socket.getIOChannel().register(
1407: socket.getPoller().getSelector(),
1408: SelectionKey.OP_READ, key);
1409: } catch (Exception x) {
1410: log.error("", x);
1411: }
1412: } else {
1413: final SelectionKey key = socket.getIOChannel().keyFor(
1414: socket.getPoller().getSelector());
1415: try {
1416: boolean cancel = false;
1417: if (key != null) {
1418: final KeyAttachment att = (KeyAttachment) key
1419: .attachment();
1420: if (att != null) {
1421: att.access();//to prevent timeout
1422: //we are registering the key to start with, reset the fairness counter.
1423: att.setFairness(0);
1424: att.interestOps(interestOps);
1425: key.interestOps(interestOps);
1426: } else {
1427: cancel = true;
1428: }
1429: } else {
1430: cancel = true;
1431: }
1432: if (cancel)
1433: getPoller0().cancelledKey(key,
1434: SocketStatus.ERROR, false);
1435: } catch (CancelledKeyException ckx) {
1436: try {
1437: getPoller0().cancelledKey(key,
1438: SocketStatus.DISCONNECT, true);
1439: } catch (Exception ignore) {
1440: }
1441: }
1442: }//end if
1443: }//run
1444:
1445: public String toString() {
1446: return super .toString() + "[intOps=" + this .interestOps
1447: + "]";
1448: }
1449: }
1450:
1451: /**
1452: * Poller class.
1453: */
1454: public class Poller implements Runnable {
1455:
1456: protected Selector selector;
1457: protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
1458:
1459: protected boolean close = false;
1460: protected long nextExpiration = 0;//optimize expiration handling
1461:
1462: protected int keepAliveCount = 0;
1463:
1464: public int getKeepAliveCount() {
1465: return keepAliveCount;
1466: }
1467:
1468: protected AtomicLong wakeupCounter = new AtomicLong(0l);
1469:
1470: protected CountDownLatch stopLatch = new CountDownLatch(1);
1471:
1472: public Poller() throws IOException {
1473: this .selector = Selector.open();
1474: }
1475:
1476: public Selector getSelector() {
1477: return selector;
1478: }
1479:
1480: /**
1481: * Create the poller. With some versions of APR, the maximum poller size will
1482: * be 62 (reocmpiling APR is necessary to remove this limitation).
1483: */
1484: protected void init() {
1485: keepAliveCount = 0;
1486: }
1487:
1488: /**
1489: * Destroy the poller.
1490: */
1491: protected void destroy() {
1492: // Wait for polltime before doing anything, so that the poller threads
1493: // exit, otherwise parallel descturction of sockets which are still
1494: // in the poller can cause problems
1495: close = true;
1496: events.clear();
1497: selector.wakeup();
1498: try {
1499: stopLatch.await(5, TimeUnit.SECONDS);
1500: } catch (InterruptedException ignore) {
1501: }
1502: }
1503:
1504: public void addEvent(Runnable event) {
1505: events.offer(event);
1506: if (wakeupCounter.incrementAndGet() < 3)
1507: selector.wakeup();
1508: }
1509:
1510: /**
1511: * Add specified socket and associated pool to the poller. The socket will
1512: * be added to a temporary array, and polled first after a maximum amount
1513: * of time equal to pollTime (in most cases, latency will be much lower,
1514: * however).
1515: *
1516: * @param socket to add to the poller
1517: */
1518: public void add(final NioChannel socket) {
1519: add(socket, SelectionKey.OP_READ);
1520: }
1521:
1522: public void add(final NioChannel socket, final int interestOps) {
1523: PollerEvent r = eventCache.poll();
1524: if (r == null)
1525: r = new PollerEvent(socket, null, interestOps);
1526: else
1527: r.reset(socket, null, interestOps);
1528: addEvent(r);
1529: }
1530:
1531: public boolean events() {
1532: boolean result = false;
1533: //synchronized (events) {
1534: Runnable r = null;
1535: result = (events.size() > 0);
1536: while ((r = (Runnable) events.poll()) != null) {
1537: try {
1538: r.run();
1539: if (r instanceof PollerEvent) {
1540: ((PollerEvent) r).reset();
1541: eventCache.offer((PollerEvent) r);
1542: }
1543: } catch (Throwable x) {
1544: log.error("", x);
1545: }
1546: }
1547: //events.clear();
1548: //}
1549: return result;
1550: }
1551:
1552: public void register(final NioChannel socket) {
1553: socket.setPoller(this );
1554: KeyAttachment key = keyCache.poll();
1555: final KeyAttachment ka = key != null ? key
1556: : new KeyAttachment();
1557: ka.reset(this , socket);
1558: PollerEvent r = eventCache.poll();
1559: ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
1560: if (r == null)
1561: r = new PollerEvent(socket, ka, OP_REGISTER);
1562: else
1563: r.reset(socket, ka, OP_REGISTER);
1564: addEvent(r);
1565: }
1566:
1567: public void cancelledKey(SelectionKey key, SocketStatus status,
1568: boolean dispatch) {
1569: try {
1570: if (key == null)
1571: return;//nothing to do
1572: KeyAttachment ka = (KeyAttachment) key.attachment();
1573: if (ka != null && ka.getComet() && status != null) {
1574: //the comet event takes care of clean up
1575: //processSocket(ka.getChannel(), status, dispatch);
1576: ka.setComet(false);//to avoid a loop
1577: processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key
1578: if (status == SocketStatus.TIMEOUT)
1579: return; // don't close on comet timeout
1580: }
1581: if (key.isValid())
1582: key.cancel();
1583: if (key.channel().isOpen())
1584: try {
1585: key.channel().close();
1586: } catch (Exception ignore) {
1587: }
1588: try {
1589: ka.channel.close(true);
1590: } catch (Exception ignore) {
1591: }
1592: key.attach(null);
1593: } catch (Throwable e) {
1594: if (log.isDebugEnabled())
1595: log.error("", e);
1596: // Ignore
1597: }
1598: }
1599:
1600: /**
1601: * The background thread that listens for incoming TCP/IP connections and
1602: * hands them off to an appropriate processor.
1603: */
1604: public void run() {
1605: // Loop until we receive a shutdown command
1606: while (running) {
1607: try {
1608: // Loop if endpoint is paused
1609: while (paused && (!close)) {
1610: try {
1611: Thread.sleep(500);
1612: } catch (InterruptedException e) {
1613: // Ignore
1614: }
1615: }
1616: boolean hasEvents = false;
1617:
1618: hasEvents = (hasEvents | events());
1619: // Time to terminate?
1620: if (close) {
1621: timeout(0, false);
1622: stopLatch.countDown();
1623: return;
1624: }
1625: int keyCount = 0;
1626: try {
1627: if (!close) {
1628: keyCount = selector.select(selectorTimeout);
1629: wakeupCounter.set(0);
1630: }
1631: if (close) {
1632: timeout(0, false);
1633: stopLatch.countDown();
1634: selector.close();
1635: return;
1636: }
1637: } catch (NullPointerException x) {
1638: //sun bug 5076772 on windows JDK 1.5
1639: if (wakeupCounter == null || selector == null)
1640: throw x;
1641: continue;
1642: } catch (CancelledKeyException x) {
1643: //sun bug 5076772 on windows JDK 1.5
1644: if (wakeupCounter == null || selector == null)
1645: throw x;
1646: continue;
1647: } catch (Throwable x) {
1648: log.error("", x);
1649: continue;
1650: }
1651: //either we timed out or we woke up, process events first
1652: if (keyCount == 0)
1653: hasEvents = (hasEvents | events());
1654:
1655: Iterator iterator = keyCount > 0 ? selector
1656: .selectedKeys().iterator() : null;
1657: // Walk through the collection of ready keys and dispatch
1658: // any active event.
1659: while (iterator != null && iterator.hasNext()) {
1660: SelectionKey sk = (SelectionKey) iterator
1661: .next();
1662: KeyAttachment attachment = (KeyAttachment) sk
1663: .attachment();
1664: iterator.remove();
1665: processKey(sk, attachment);
1666: }//while
1667:
1668: //process timeouts
1669: timeout(keyCount, hasEvents);
1670: if (oomParachute > 0 && oomParachuteData == null)
1671: checkParachute();
1672: } catch (OutOfMemoryError oom) {
1673: try {
1674: oomParachuteData = null;
1675: releaseCaches();
1676: log.error("", oom);
1677: } catch (Throwable oomt) {
1678: try {
1679: System.err.println(oomParachuteMsg);
1680: oomt.printStackTrace();
1681: } catch (Throwable letsHopeWeDontGetHere) {
1682: }
1683: }
1684: }
1685: }//while
1686: synchronized (this ) {
1687: this .notifyAll();
1688: }
1689: stopLatch.countDown();
1690:
1691: }
1692:
1693: protected boolean processKey(SelectionKey sk,
1694: KeyAttachment attachment) {
1695: boolean result = true;
1696: try {
1697: if (close) {
1698: cancelledKey(sk, SocketStatus.STOP, false);
1699: } else if (sk.isValid() && attachment != null) {
1700: attachment.access();//make sure we don't time out valid sockets
1701: sk.attach(attachment);//cant remember why this is here
1702: NioChannel channel = attachment.getChannel();
1703: if (sk.isReadable() || sk.isWritable()) {
1704: if (sk.isReadable()
1705: && attachment.getReadLatch() != null) {
1706: unreg(sk, attachment, SelectionKey.OP_READ);
1707: attachment.getReadLatch().countDown();
1708: } else if (sk.isWritable()
1709: && attachment.getWriteLatch() != null) {
1710: unreg(sk, attachment, SelectionKey.OP_WRITE);
1711: attachment.getWriteLatch().countDown();
1712: } else if (attachment.getSendfileData() != null) {
1713: processSendfile(sk, attachment, true);
1714: } else if (attachment.getComet()) {
1715: //check if thread is available
1716: if (isWorkerAvailable()) {
1717: unreg(sk, attachment, sk.readyOps());
1718: if (!processSocket(channel,
1719: SocketStatus.OPEN))
1720: processSocket(channel,
1721: SocketStatus.DISCONNECT);
1722: attachment.setFairness(0);
1723: } else {
1724: //increase the fairness counter
1725: attachment.incFairness();
1726: result = false;
1727: }
1728: } else {
1729: //later on, improve latch behavior
1730: if (isWorkerAvailable()) {
1731: unreg(sk, attachment, sk.readyOps());
1732: boolean close = (!processSocket(channel));
1733: if (close) {
1734: cancelledKey(sk,
1735: SocketStatus.DISCONNECT,
1736: false);
1737: }
1738: attachment.setFairness(0);
1739: } else {
1740: //increase the fairness counter
1741: attachment.incFairness();
1742: result = false;
1743: }
1744: }
1745: }
1746: } else {
1747: //invalid key
1748: cancelledKey(sk, SocketStatus.ERROR, false);
1749: }
1750: } catch (CancelledKeyException ckx) {
1751: cancelledKey(sk, SocketStatus.ERROR, false);
1752: } catch (Throwable t) {
1753: log.error("", t);
1754: }
1755: return result;
1756: }
1757:
1758: public boolean processSendfile(SelectionKey sk,
1759: KeyAttachment attachment, boolean reg) {
1760: try {
1761: //unreg(sk,attachment);//only do this if we do process send file on a separate thread
1762: SendfileData sd = attachment.getSendfileData();
1763: if (sd.fchannel == null) {
1764: File f = new File(sd.fileName);
1765: if (!f.exists()) {
1766: cancelledKey(sk, SocketStatus.ERROR, false);
1767: return false;
1768: }
1769: sd.fchannel = new FileInputStream(f).getChannel();
1770: }
1771: SocketChannel sc = attachment.getChannel()
1772: .getIOChannel();
1773: long written = sd.fchannel.transferTo(sd.pos,
1774: sd.length, sc);
1775: if (written > 0) {
1776: sd.pos += written;
1777: sd.length -= written;
1778: }
1779: if (sd.length <= 0) {
1780: attachment.setSendfileData(null);
1781: if (sd.keepAlive)
1782: if (reg)
1783: reg(sk, attachment, SelectionKey.OP_READ);
1784: else
1785: cancelledKey(sk, SocketStatus.STOP, false);
1786: } else if (attachment.interestOps() == 0 && reg) {
1787: reg(sk, attachment, SelectionKey.OP_WRITE);
1788: }
1789: } catch (IOException x) {
1790: if (log.isDebugEnabled())
1791: log.warn("Unable to complete sendfile request:", x);
1792: cancelledKey(sk, SocketStatus.ERROR, false);
1793: return false;
1794: } catch (Throwable t) {
1795: log.error("", t);
1796: cancelledKey(sk, SocketStatus.ERROR, false);
1797: return false;
1798: }
1799: return true;
1800: }
1801:
1802: protected void unreg(SelectionKey sk, KeyAttachment attachment,
1803: int readyOps) {
1804: //this is a must, so that we don't have multiple threads messing with the socket
1805: reg(sk, attachment, sk.interestOps() & (~readyOps));
1806: }
1807:
1808: protected void reg(SelectionKey sk, KeyAttachment attachment,
1809: int intops) {
1810: sk.interestOps(intops);
1811: attachment.interestOps(intops);
1812: }
1813:
1814: protected void timeout(int keyCount, boolean hasEvents) {
1815: long now = System.currentTimeMillis();
1816: //don't process timeouts too frequently, but if the selector simply timed out
1817: //then we can check timeouts to avoid gaps
1818: if ((now < nextExpiration) && (keyCount > 0 || hasEvents)
1819: && (!close))
1820: return;
1821: nextExpiration = now
1822: + (long) socketProperties.getSoTimeout();
1823: //timeout
1824: Set<SelectionKey> keys = selector.keys();
1825: int keycount = 0;
1826: for (Iterator<SelectionKey> iter = keys.iterator(); iter
1827: .hasNext();) {
1828: SelectionKey key = iter.next();
1829: keycount++;
1830: try {
1831: KeyAttachment ka = (KeyAttachment) key.attachment();
1832: if (ka == null) {
1833: cancelledKey(key, SocketStatus.ERROR, false); //we don't support any keys without attachments
1834: } else if (ka.getError()) {
1835: cancelledKey(key, SocketStatus.ERROR, true);
1836: } else if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
1837: //only timeout sockets that we are waiting for a read from
1838: long delta = now - ka.getLastAccess();
1839: long timeout = (ka.getTimeout() == -1) ? ((long) socketProperties
1840: .getSoTimeout())
1841: : (ka.getTimeout());
1842: boolean isTimedout = delta > timeout;
1843: if (close) {
1844: key.interestOps(0);
1845: ka.interestOps(0); //avoid duplicate stop calls
1846: processKey(key, ka);
1847: } else if (isTimedout) {
1848: key.interestOps(0);
1849: ka.interestOps(0); //avoid duplicate timeout calls
1850: cancelledKey(key, SocketStatus.TIMEOUT,
1851: true);
1852: } else {
1853: long nextTime = now + (timeout - delta);
1854: nextExpiration = (nextTime < nextExpiration) ? nextTime
1855: : nextExpiration;
1856: }
1857: }//end if
1858: } catch (CancelledKeyException ckx) {
1859: cancelledKey(key, SocketStatus.ERROR, false);
1860: }
1861: }//for
1862: if (log.isDebugEnabled())
1863: log.debug("Poller processed " + keycount
1864: + " keys through timeout");
1865: }
1866: }
1867:
1868: // ----------------------------------------------------- Key Attachment Class
1869: public static class KeyAttachment {
1870:
1871: public KeyAttachment() {
1872:
1873: }
1874:
1875: public void reset(Poller poller, NioChannel channel) {
1876: this .channel = channel;
1877: this .poller = poller;
1878: lastAccess = System.currentTimeMillis();
1879: currentAccess = false;
1880: comet = false;
1881: timeout = -1;
1882: error = false;
1883: fairness = 0;
1884: lastRegistered = 0;
1885: sendfileData = null;
1886: if (readLatch != null)
1887: try {
1888: for (int i = 0; i < (int) readLatch.getCount(); i++)
1889: readLatch.countDown();
1890: } catch (Exception ignore) {
1891: }
1892: readLatch = null;
1893: if (writeLatch != null)
1894: try {
1895: for (int i = 0; i < (int) writeLatch.getCount(); i++)
1896: writeLatch.countDown();
1897: } catch (Exception ignore) {
1898: }
1899: writeLatch = null;
1900: }
1901:
1902: public void reset() {
1903: reset(null, null);
1904: }
1905:
1906: public Poller getPoller() {
1907: return poller;
1908: }
1909:
1910: public void setPoller(Poller poller) {
1911: this .poller = poller;
1912: }
1913:
1914: public long getLastAccess() {
1915: return lastAccess;
1916: }
1917:
1918: public void access() {
1919: access(System.currentTimeMillis());
1920: }
1921:
1922: public void access(long access) {
1923: lastAccess = access;
1924: }
1925:
1926: public void setComet(boolean comet) {
1927: this .comet = comet;
1928: }
1929:
1930: public boolean getComet() {
1931: return comet;
1932: }
1933:
1934: public boolean getCurrentAccess() {
1935: return currentAccess;
1936: }
1937:
1938: public void setCurrentAccess(boolean access) {
1939: currentAccess = access;
1940: }
1941:
1942: public Object getMutex() {
1943: return mutex;
1944: }
1945:
1946: public void setTimeout(long timeout) {
1947: this .timeout = timeout;
1948: }
1949:
1950: public long getTimeout() {
1951: return this .timeout;
1952: }
1953:
1954: public boolean getError() {
1955: return error;
1956: }
1957:
1958: public void setError(boolean error) {
1959: this .error = error;
1960: }
1961:
1962: public NioChannel getChannel() {
1963: return channel;
1964: }
1965:
1966: public void setChannel(NioChannel channel) {
1967: this .channel = channel;
1968: }
1969:
1970: protected Poller poller = null;
1971: protected int interestOps = 0;
1972:
1973: public int interestOps() {
1974: return interestOps;
1975: }
1976:
1977: public int interestOps(int ops) {
1978: this .interestOps = ops;
1979: return ops;
1980: }
1981:
1982: public CountDownLatch getReadLatch() {
1983: return readLatch;
1984: }
1985:
1986: public CountDownLatch getWriteLatch() {
1987: return writeLatch;
1988: }
1989:
1990: protected CountDownLatch resetLatch(CountDownLatch latch) {
1991: if (latch.getCount() == 0)
1992: return null;
1993: else
1994: throw new IllegalStateException(
1995: "Latch must be at count 0");
1996: }
1997:
1998: public void resetReadLatch() {
1999: readLatch = resetLatch(readLatch);
2000: }
2001:
2002: public void resetWriteLatch() {
2003: writeLatch = resetLatch(writeLatch);
2004: }
2005:
2006: protected CountDownLatch startLatch(CountDownLatch latch,
2007: int cnt) {
2008: if (latch == null || latch.getCount() == 0) {
2009: return new CountDownLatch(cnt);
2010: } else
2011: throw new IllegalStateException(
2012: "Latch must be at count 0 or null.");
2013: }
2014:
2015: public void startReadLatch(int cnt) {
2016: readLatch = startLatch(readLatch, cnt);
2017: }
2018:
2019: public void startWriteLatch(int cnt) {
2020: writeLatch = startLatch(writeLatch, cnt);
2021: }
2022:
2023: protected void awaitLatch(CountDownLatch latch, long timeout,
2024: TimeUnit unit) throws InterruptedException {
2025: if (latch == null)
2026: throw new IllegalStateException("Latch cannot be null");
2027: latch.await(timeout, unit);
2028: }
2029:
2030: public void awaitReadLatch(long timeout, TimeUnit unit)
2031: throws InterruptedException {
2032: awaitLatch(readLatch, timeout, unit);
2033: }
2034:
2035: public void awaitWriteLatch(long timeout, TimeUnit unit)
2036: throws InterruptedException {
2037: awaitLatch(writeLatch, timeout, unit);
2038: }
2039:
2040: public int getFairness() {
2041: return fairness;
2042: }
2043:
2044: public void setFairness(int f) {
2045: fairness = f;
2046: }
2047:
2048: public void incFairness() {
2049: fairness++;
2050: }
2051:
2052: public long getLastRegistered() {
2053: return lastRegistered;
2054: };
2055:
2056: public void setLastRegistered(long reg) {
2057: lastRegistered = reg;
2058: }
2059:
2060: public void setSendfileData(SendfileData sf) {
2061: this .sendfileData = sf;
2062: }
2063:
2064: public SendfileData getSendfileData() {
2065: return this .sendfileData;
2066: }
2067:
2068: protected Object mutex = new Object();
2069: protected long lastAccess = -1;
2070: protected boolean currentAccess = false;
2071: protected boolean comet = false;
2072: protected long timeout = -1;
2073: protected boolean error = false;
2074: protected NioChannel channel = null;
2075: protected CountDownLatch readLatch = null;
2076: protected CountDownLatch writeLatch = null;
2077: protected int fairness = 0;
2078: protected long lastRegistered = 0;
2079: protected SendfileData sendfileData = null;
2080: }
2081:
2082: // ----------------------------------------------------- Worker Inner Class
2083:
2084: /**
2085: * Server processor class.
2086: */
2087: protected class Worker implements Runnable {
2088:
2089: protected Thread thread = null;
2090: protected boolean available = false;
2091: protected Object socket = null;
2092: protected SocketStatus status = null;
2093:
2094: /**
2095: * Process an incoming TCP/IP connection on the specified socket. Any
2096: * exception that occurs during processing must be logged and swallowed.
2097: * <b>NOTE</b>: This method is called from our Connector's thread. We
2098: * must assign it to our own thread so that multiple simultaneous
2099: * requests can be handled.
2100: *
2101: * @param socket TCP socket to process
2102: */
2103: protected synchronized void assign(Object socket) {
2104:
2105: // Wait for the Processor to get the previous Socket
2106: while (available) {
2107: try {
2108: wait();
2109: } catch (InterruptedException e) {
2110: }
2111: }
2112: // Store the newly available Socket and notify our thread
2113: this .socket = socket;
2114: status = null;
2115: available = true;
2116: notifyAll();
2117:
2118: }
2119:
2120: protected synchronized void assign(Object socket,
2121: SocketStatus status) {
2122:
2123: // Wait for the Processor to get the previous Socket
2124: while (available) {
2125: try {
2126: wait();
2127: } catch (InterruptedException e) {
2128: }
2129: }
2130:
2131: // Store the newly available Socket and notify our thread
2132: this .socket = socket;
2133: this .status = status;
2134: available = true;
2135: notifyAll();
2136: }
2137:
2138: /**
2139: * Await a newly assigned Socket from our Connector, or <code>null</code>
2140: * if we are supposed to shut down.
2141: */
2142: protected synchronized Object await() {
2143:
2144: // Wait for the Connector to provide a new Socket
2145: while (!available) {
2146: try {
2147: wait();
2148: } catch (InterruptedException e) {
2149: }
2150: }
2151:
2152: // Notify the Connector that we have received this Socket
2153: Object socket = this .socket;
2154: available = false;
2155: notifyAll();
2156:
2157: return (socket);
2158:
2159: }
2160:
2161: /**
2162: * The background thread that listens for incoming TCP/IP connections and
2163: * hands them off to an appropriate processor.
2164: */
2165: public void run() {
2166:
2167: // Process requests until we receive a shutdown signal
2168: while (running) {
2169: NioChannel socket = null;
2170: SelectionKey key = null;
2171: try {
2172: // Wait for the next socket to be assigned
2173: Object channel = await();
2174: if (channel == null)
2175: continue;
2176:
2177: if (channel instanceof SocketChannel) {
2178: SocketChannel sc = (SocketChannel) channel;
2179: if (!setSocketOptions(sc)) {
2180: try {
2181: sc.socket().close();
2182: sc.close();
2183: } catch (IOException ix) {
2184: if (log.isDebugEnabled())
2185: log.debug("", ix);
2186: }
2187: } else {
2188: //now we have it registered, remove it from the cache
2189:
2190: }
2191: } else {
2192: socket = (NioChannel) channel;
2193: SocketProcessor sc = processorCache.poll();
2194: if (sc == null)
2195: sc = new SocketProcessor(socket, status);
2196: else
2197: sc.reset(socket, status);
2198: sc.run();
2199: }
2200: } catch (CancelledKeyException cx) {
2201: if (socket != null && key != null)
2202: socket.getPoller().cancelledKey(key, null,
2203: false);
2204: } catch (OutOfMemoryError oom) {
2205: try {
2206: oomParachuteData = null;
2207: releaseCaches();
2208: log.error("", oom);
2209: } catch (Throwable oomt) {
2210: try {
2211: System.err.println(oomParachuteMsg);
2212: oomt.printStackTrace();
2213: } catch (Throwable letsHopeWeDontGetHere) {
2214: }
2215: }
2216: } finally {
2217: //dereference socket to let GC do its job
2218: socket = null;
2219: // Finish up this request
2220: recycleWorkerThread(this );
2221: }
2222: }
2223: }
2224:
2225: /**
2226: * Start the background processing thread.
2227: */
2228: public void start() {
2229: thread = new Thread(this );
2230: thread.setName(getName() + "-" + (++curThreads));
2231: thread.setDaemon(true);
2232: thread.setPriority(getThreadPriority());
2233: thread.start();
2234: }
2235:
2236: }
2237:
2238: // ------------------------------------------------ Application Buffer Handler
2239: public class NioBufferHandler implements ApplicationBufferHandler {
2240: protected ByteBuffer readbuf = null;
2241: protected ByteBuffer writebuf = null;
2242:
2243: public NioBufferHandler(int readsize, int writesize,
2244: boolean direct) {
2245: if (direct) {
2246: readbuf = ByteBuffer.allocateDirect(readsize);
2247: writebuf = ByteBuffer.allocateDirect(writesize);
2248: } else {
2249: readbuf = ByteBuffer.allocate(readsize);
2250: writebuf = ByteBuffer.allocate(writesize);
2251: }
2252: }
2253:
2254: public ByteBuffer expand(ByteBuffer buffer, int remaining) {
2255: return buffer;
2256: }
2257:
2258: public ByteBuffer getReadBuffer() {
2259: return readbuf;
2260: }
2261:
2262: public ByteBuffer getWriteBuffer() {
2263: return writebuf;
2264: }
2265:
2266: }
2267:
2268: // ------------------------------------------------ Handler Inner Interface
2269:
2270: /**
2271: * Bare bones interface used for socket processing. Per thread data is to be
2272: * stored in the ThreadWithAttributes extra folders, or alternately in
2273: * thread local fields.
2274: */
2275: public interface Handler {
2276: public enum SocketState {
2277: OPEN, CLOSED, LONG
2278: }
2279:
2280: public SocketState process(NioChannel socket);
2281:
2282: public SocketState event(NioChannel socket, SocketStatus status);
2283:
2284: public void releaseCaches();
2285: }
2286:
2287: // ------------------------------------------------- WorkerStack Inner Class
2288:
2289: public class WorkerStack {
2290:
2291: protected Worker[] workers = null;
2292: protected int end = 0;
2293:
2294: public WorkerStack(int size) {
2295: workers = new Worker[size];
2296: }
2297:
2298: /**
2299: * Put the object into the queue.
2300: *
2301: * @param object the object to be appended to the queue (first element).
2302: */
2303: public void push(Worker worker) {
2304: workers[end++] = worker;
2305: }
2306:
2307: /**
2308: * Get the first object out of the queue. Return null if the queue
2309: * is empty.
2310: */
2311: public Worker pop() {
2312: if (end > 0) {
2313: return workers[--end];
2314: }
2315: return null;
2316: }
2317:
2318: /**
2319: * Get the first object out of the queue, Return null if the queue
2320: * is empty.
2321: */
2322: public Worker peek() {
2323: return workers[end];
2324: }
2325:
2326: /**
2327: * Is the queue empty?
2328: */
2329: public boolean isEmpty() {
2330: return (end == 0);
2331: }
2332:
2333: /**
2334: * How many elements are there in this queue?
2335: */
2336: public int size() {
2337: return (end);
2338: }
2339: }
2340:
2341: // ---------------------------------------------- SocketProcessor Inner Class
2342:
2343: /**
2344: * This class is the equivalent of the Worker, but will simply use in an
2345: * external Executor thread pool.
2346: */
2347: protected class SocketProcessor implements Runnable {
2348:
2349: protected NioChannel socket = null;
2350: protected SocketStatus status = null;
2351:
2352: public SocketProcessor(NioChannel socket, SocketStatus status) {
2353: reset(socket, status);
2354: }
2355:
2356: public void reset(NioChannel socket, SocketStatus status) {
2357: this .socket = socket;
2358: this .status = status;
2359: }
2360:
2361: public void run() {
2362: SelectionKey key = null;
2363: try {
2364: key = socket.getIOChannel().keyFor(
2365: socket.getPoller().getSelector());
2366: int handshake = -1;
2367:
2368: try {
2369: if (key != null)
2370: handshake = socket.handshake(key.isReadable(),
2371: key.isWritable());
2372: } catch (IOException x) {
2373: handshake = -1;
2374: if (log.isDebugEnabled())
2375: log.debug("Error during SSL handshake", x);
2376: } catch (CancelledKeyException ckx) {
2377: handshake = -1;
2378: }
2379: if (handshake == 0) {
2380: // Process the request from this socket
2381: boolean closed = (status == null) ? (handler
2382: .process(socket) == Handler.SocketState.CLOSED)
2383: : (handler.event(socket, status) == Handler.SocketState.CLOSED);
2384:
2385: if (closed) {
2386: // Close socket and pool
2387: try {
2388: KeyAttachment ka = null;
2389: if (key != null) {
2390: ka = (KeyAttachment) key.attachment();
2391: if (ka != null)
2392: ka.setComet(false);
2393: socket.getPoller().cancelledKey(key,
2394: SocketStatus.ERROR, false);
2395: }
2396: if (socket != null)
2397: nioChannels.offer(socket);
2398: socket = null;
2399: if (ka != null)
2400: keyCache.offer(ka);
2401: ka = null;
2402: } catch (Exception x) {
2403: log.error("", x);
2404: }
2405: }
2406: } else if (handshake == -1) {
2407: KeyAttachment ka = null;
2408: if (key != null) {
2409: ka = (KeyAttachment) key.attachment();
2410: socket.getPoller().cancelledKey(key,
2411: SocketStatus.DISCONNECT, false);
2412: }
2413: if (socket != null)
2414: nioChannels.offer(socket);
2415: socket = null;
2416: if (ka != null)
2417: keyCache.offer(ka);
2418: ka = null;
2419: } else {
2420: final SelectionKey fk = key;
2421: final int intops = handshake;
2422: final KeyAttachment ka = (KeyAttachment) fk
2423: .attachment();
2424: ka.getPoller().add(socket, intops);
2425: }
2426: } catch (CancelledKeyException cx) {
2427: socket.getPoller().cancelledKey(key, null, false);
2428: } catch (OutOfMemoryError oom) {
2429: try {
2430: oomParachuteData = null;
2431: socket.getPoller().cancelledKey(key,
2432: SocketStatus.ERROR, false);
2433: releaseCaches();
2434: log.error("", oom);
2435: } catch (Throwable oomt) {
2436: try {
2437: System.err.println(oomParachuteMsg);
2438: oomt.printStackTrace();
2439: } catch (Throwable letsHopeWeDontGetHere) {
2440: }
2441: }
2442: } catch (Throwable t) {
2443: log.error("", t);
2444: socket.getPoller().cancelledKey(key,
2445: SocketStatus.ERROR, false);
2446: } finally {
2447: socket = null;
2448: status = null;
2449: //return to cache
2450: processorCache.offer(this );
2451: }
2452: }
2453:
2454: }
2455:
2456: // ---------------------------------------------- TaskQueue Inner Class
2457: public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
2458: ThreadPoolExecutor parent = null;
2459:
2460: public TaskQueue() {
2461: super ();
2462: }
2463:
2464: public TaskQueue(int initialCapacity) {
2465: super (initialCapacity);
2466: }
2467:
2468: public TaskQueue(Collection<? extends Runnable> c) {
2469: super (c);
2470: }
2471:
2472: public void setParent(ThreadPoolExecutor tp) {
2473: parent = tp;
2474: }
2475:
2476: public boolean offer(Runnable o) {
2477: //we can't do any checks
2478: if (parent == null)
2479: return super .offer(o);
2480: //we are maxed out on threads, simply queue the object
2481: if (parent.getPoolSize() == parent.getMaximumPoolSize())
2482: return super .offer(o);
2483: //we have idle threads, just add it to the queue
2484: //this is an approximation, so it could use some tuning
2485: if (parent.getActiveCount() < (parent.getPoolSize()))
2486: return super .offer(o);
2487: //if we have less threads than maximum force creation of a new thread
2488: if (parent.getPoolSize() < parent.getMaximumPoolSize())
2489: return false;
2490: //if we reached here, we need to add it to the queue
2491: return super .offer(o);
2492: }
2493: }
2494:
2495: // ---------------------------------------------- ThreadFactory Inner Class
2496: class TaskThreadFactory implements ThreadFactory {
2497: final ThreadGroup group;
2498: final AtomicInteger threadNumber = new AtomicInteger(1);
2499: final String namePrefix;
2500:
2501: TaskThreadFactory(String namePrefix) {
2502: SecurityManager s = System.getSecurityManager();
2503: group = (s != null) ? s.getThreadGroup() : Thread
2504: .currentThread().getThreadGroup();
2505: this .namePrefix = namePrefix;
2506: }
2507:
2508: public Thread newThread(Runnable r) {
2509: Thread t = new Thread(group, r, namePrefix
2510: + threadNumber.getAndIncrement());
2511: t.setDaemon(daemon);
2512: t.setPriority(getThreadPriority());
2513: return t;
2514: }
2515: }
2516:
2517: // ----------------------------------------------- SendfileData Inner Class
2518:
2519: /**
2520: * SendfileData class.
2521: */
2522: public static class SendfileData {
2523: // File
2524: public String fileName;
2525: public FileChannel fchannel;
2526: public long pos;
2527: public long length;
2528: // KeepAlive flag
2529: public boolean keepAlive;
2530: }
2531:
2532: }
|