0001: /*
0002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
0003: *
0004: * This file is part of Resin(R) Open Source
0005: *
0006: * Each copy or derived work must preserve the copyright notice and this
0007: * notice unmodified.
0008: *
0009: * Resin Open Source is free software; you can redistribute it and/or modify
0010: * it under the terms of the GNU General Public License as published by
0011: * the Free Software Foundation; either version 2 of the License, or
0012: * (at your option) any later version.
0013: *
0014: * Resin Open Source is distributed in the hope that it will be useful,
0015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
0017: * of NON-INFRINGEMENT. See the GNU General Public License for more
0018: * details.
0019: *
0020: * You should have received a copy of the GNU General Public License
0021: * along with Resin Open Source; if not, write to the
0022: *
0023: * Free Software Foundation, Inc.
0024: * 59 Temple Place, Suite 330
0025: * Boston, MA 02111-1307 USA
0026: *
0027: * @author Scott Ferguson
0028: */
0029:
0030: package com.caucho.server.port;
0031:
0032: import com.caucho.config.Config;
0033: import com.caucho.config.ConfigException;
0034: import com.caucho.config.types.Period;
0035: import com.caucho.lifecycle.Lifecycle;
0036: import com.caucho.loader.Environment;
0037: import com.caucho.loader.EnvironmentBean;
0038: import com.caucho.loader.EnvironmentClassLoader;
0039: import com.caucho.loader.EnvironmentListener;
0040: import com.caucho.log.Log;
0041: import com.caucho.management.server.PortMXBean;
0042: import com.caucho.server.connection.ConnectionController;
0043: import com.caucho.server.cluster.ClusterServer;
0044: import com.caucho.server.cluster.Server;
0045: import com.caucho.util.*;
0046: import com.caucho.vfs.JsseSSLFactory;
0047: import com.caucho.vfs.QJniServerSocket;
0048: import com.caucho.vfs.QServerSocket;
0049: import com.caucho.vfs.QSocket;
0050: import com.caucho.vfs.SSLFactory;
0051:
0052: import javax.annotation.PostConstruct;
0053: import java.net.ConnectException;
0054: import java.net.InetAddress;
0055: import java.net.InetSocketAddress;
0056: import java.net.Socket;
0057: import java.net.UnknownHostException;
0058: import java.util.logging.Level;
0059: import java.util.logging.Logger;
0060: import java.util.ArrayList;
0061:
0062: /**
0063: * Represents a protocol connection.
0064: */
0065: public class Port implements EnvironmentListener, Runnable {
0066: private static final L10N L = new L10N(Port.class);
0067:
0068: private static final Logger log = Logger.getLogger(Port.class
0069: .getName());
0070:
0071: private static final int DEFAULT = -0xcafe;
0072:
0073: // started at 128, but that seems wasteful since the active threads
0074: // themselves are buffering the free connections
0075: private FreeList<TcpConnection> _freeConn = new FreeList<TcpConnection>(
0076: 32);
0077:
0078: // The owning server
0079: private ProtocolDispatchServer _server;
0080:
0081: // The id
0082: private String _serverId = "";
0083:
0084: // The address
0085: private String _address;
0086: // The port
0087: private int _port;
0088:
0089: // The protocol
0090: private Protocol _protocol;
0091:
0092: // The SSL factory, if any
0093: private SSLFactory _sslFactory;
0094:
0095: // Secure override for load-balancers/proxies
0096: private boolean _isSecure;
0097:
0098: private InetAddress _socketAddress;
0099:
0100: private int _acceptThreadMin = DEFAULT;
0101: private int _acceptThreadMax = DEFAULT;
0102:
0103: private int _acceptListenBacklog = DEFAULT;
0104:
0105: private int _connectionMax = 512;
0106:
0107: private int _keepaliveMax = DEFAULT;
0108:
0109: private long _keepaliveTimeMax = DEFAULT;
0110: private long _keepaliveTimeout = DEFAULT;
0111: private long _keepaliveSelectThreadTimeout = DEFAULT;
0112: private int _minSpareConnection = 16;
0113:
0114: // default timeout
0115: private long _socketTimeout = DEFAULT;
0116:
0117: private long _suspendTimeMax = DEFAULT;
0118:
0119: private boolean _tcpNoDelay = true;
0120:
0121: // The virtual host name
0122: private String _virtualHost;
0123:
0124: private final PortAdmin _admin = new PortAdmin(this );
0125:
0126: // the server socket
0127: private QServerSocket _serverSocket;
0128:
0129: // the throttle
0130: private Throttle _throttle;
0131:
0132: // the selection manager
0133: private AbstractSelectManager _selectManager;
0134:
0135: private ArrayList<TcpConnection> _suspendList = new ArrayList<TcpConnection>();
0136:
0137: private Alarm _suspendAlarm;
0138:
0139: // statistics
0140:
0141: private volatile int _threadCount;
0142: private final Object _threadCountLock = new Object();
0143:
0144: private volatile int _idleThreadCount;
0145: private volatile int _startThreadCount;
0146:
0147: private volatile int _connectionCount;
0148:
0149: private volatile long _lifetimeRequestCount;
0150: private volatile long _lifetimeKeepaliveCount;
0151: private volatile long _lifetimeClientDisconnectCount;
0152: private volatile long _lifetimeRequestTime;
0153: private volatile long _lifetimeReadBytes;
0154: private volatile long _lifetimeWriteBytes;
0155:
0156: // total keepalive
0157: private volatile int _keepaliveCount;
0158: // thread-based
0159: private volatile int _keepaliveThreadCount;
0160: private final Object _keepaliveCountLock = new Object();
0161:
0162: // True if the port has been bound
0163: private volatile boolean _isBind;
0164: private volatile boolean _isPostBind;
0165:
0166: // The port lifecycle
0167: private final Lifecycle _lifecycle = new Lifecycle();
0168:
0169: public Port() {
0170: }
0171:
0172: public Port(ClusterServer server) {
0173: }
0174:
0175: /**
0176: * Sets the containing server.
0177: */
0178: public void setParent(ProtocolDispatchServer parent) {
0179: setServer(parent);
0180: }
0181:
0182: /**
0183: * Sets the server.
0184: */
0185: public void setServer(ProtocolDispatchServer protocolServer) {
0186: _server = protocolServer;
0187:
0188: if (_protocol != null)
0189: _protocol.setServer(protocolServer);
0190:
0191: if (protocolServer instanceof Server) {
0192: Server server = (Server) protocolServer;
0193:
0194: if (_acceptThreadMax == DEFAULT)
0195: _acceptThreadMax = server.getAcceptThreadMax();
0196:
0197: if (_acceptThreadMin == DEFAULT)
0198: _acceptThreadMin = server.getAcceptThreadMin();
0199:
0200: if (_acceptListenBacklog == DEFAULT)
0201: _acceptListenBacklog = server.getAcceptListenBacklog();
0202:
0203: if (_keepaliveMax == DEFAULT)
0204: _keepaliveMax = server.getKeepaliveMax();
0205:
0206: if (_keepaliveTimeMax == DEFAULT)
0207: _keepaliveTimeMax = server
0208: .getKeepaliveConnectionTimeMax();
0209:
0210: if (_keepaliveTimeout == DEFAULT)
0211: _keepaliveTimeout = server.getKeepaliveTimeout();
0212:
0213: if (_keepaliveSelectThreadTimeout == DEFAULT) {
0214: _keepaliveSelectThreadTimeout = server
0215: .getKeepaliveSelectThreadTimeout();
0216: }
0217:
0218: if (_suspendTimeMax == DEFAULT)
0219: _suspendTimeMax = server.getSuspendTimeMax();
0220:
0221: if (_socketTimeout == DEFAULT)
0222: _socketTimeout = server.getSocketTimeout();
0223: }
0224: }
0225:
0226: /**
0227: * Gets the server.
0228: */
0229: public ProtocolDispatchServer getServer() {
0230: return _server;
0231: }
0232:
0233: /**
0234: * Sets the id.
0235: */
0236: public void setId(String id) {
0237: _serverId = id;
0238: }
0239:
0240: /**
0241: * Sets the server id.
0242: */
0243: public void setServerId(String id) {
0244: _serverId = id;
0245: }
0246:
0247: /**
0248: * Gets the server id.
0249: */
0250: public String getServerId() {
0251: return _serverId;
0252: }
0253:
0254: public PortMXBean getAdmin() {
0255: return _admin;
0256: }
0257:
0258: /**
0259: * Sets protocol class.
0260: */
0261: public void setType(Class cl) throws InstantiationException,
0262: IllegalAccessException {
0263: setClass(cl);
0264: }
0265:
0266: /**
0267: * Sets protocol class.
0268: */
0269: public void setClass(Class cl) throws InstantiationException,
0270: IllegalAccessException {
0271: Config.validate(cl, Protocol.class);
0272:
0273: _protocol = (Protocol) cl.newInstance();
0274: }
0275:
0276: public Object createInit() throws ConfigException {
0277: if (_protocol == null)
0278: throw new ConfigException(L
0279: .l("<init> requires a protocol class"));
0280:
0281: return _protocol;
0282: }
0283:
0284: /**
0285: * Set protocol.
0286: */
0287: public void setProtocol(Protocol protocol) throws ConfigException {
0288: /* server/0170
0289: if (_server == null)
0290: throw new IllegalStateException(L.l("Server is not set."));
0291: */
0292:
0293: _protocol = protocol;
0294: _protocol.setServer(_server);
0295: }
0296:
0297: /**
0298: * Set protocol.
0299: */
0300: public Protocol getProtocol() {
0301: return _protocol;
0302: }
0303:
0304: /**
0305: * Gets the protocol name.
0306: */
0307: public String getProtocolName() {
0308: if (_protocol != null)
0309: return _protocol.getProtocolName();
0310: else
0311: return null;
0312: }
0313:
0314: /**
0315: * Sets the address
0316: */
0317: public void setAddress(String address) throws UnknownHostException {
0318: if ("*".equals(address))
0319: address = null;
0320:
0321: _address = address;
0322: if (address != null)
0323: _socketAddress = InetAddress.getByName(address);
0324: }
0325:
0326: /**
0327: * @deprecated
0328: */
0329: public void setHost(String address) throws UnknownHostException {
0330: setAddress(address);
0331: }
0332:
0333: /**
0334: * Gets the IP address
0335: */
0336: public String getAddress() {
0337: return _address;
0338: }
0339:
0340: /**
0341: * Sets the port.
0342: */
0343: public void setPort(int port) {
0344: _port = port;
0345: }
0346:
0347: /**
0348: * Gets the port.
0349: */
0350: public int getPort() {
0351: return _port;
0352: }
0353:
0354: /**
0355: * Sets the virtual host for IP-based virtual host.
0356: */
0357: public void setVirtualHost(String host) {
0358: _virtualHost = host;
0359: }
0360:
0361: /**
0362: * Gets the virtual host for IP-based virtual host.
0363: */
0364: public String getVirtualHost() {
0365: return _virtualHost;
0366: }
0367:
0368: /**
0369: * Sets the SSL factory
0370: */
0371: public void setSSL(SSLFactory factory) {
0372: _sslFactory = factory;
0373: }
0374:
0375: /**
0376: * Sets the SSL factory
0377: */
0378: public SSLFactory createOpenssl() throws ConfigException {
0379: try {
0380: ClassLoader loader = Thread.currentThread()
0381: .getContextClassLoader();
0382:
0383: Class cl = Class.forName("com.caucho.vfs.OpenSSLFactory",
0384: false, loader);
0385:
0386: _sslFactory = (SSLFactory) cl.newInstance();
0387:
0388: return _sslFactory;
0389: } catch (Throwable e) {
0390: log.log(Level.FINER, e.toString(), e);
0391:
0392: throw new ConfigException(
0393: L
0394: .l("<openssl> requires Resin Professional. See http://www.caucho.com for more information."));
0395: }
0396: }
0397:
0398: /**
0399: * Sets the SSL factory
0400: */
0401: public JsseSSLFactory createJsse() {
0402: // should probably check that openssl exists
0403: return new JsseSSLFactory();
0404: }
0405:
0406: /**
0407: * Sets the SSL factory
0408: */
0409: public void setJsseSsl(JsseSSLFactory factory) {
0410: _sslFactory = factory;
0411: }
0412:
0413: /**
0414: * Gets the SSL factory.
0415: */
0416: public SSLFactory getSSL() {
0417: return _sslFactory;
0418: }
0419:
0420: /**
0421: * Returns true for ssl.
0422: */
0423: public boolean isSSL() {
0424: return _sslFactory != null;
0425: }
0426:
0427: /**
0428: * Sets true for secure
0429: */
0430: public void setSecure(boolean isSecure) {
0431: _isSecure = isSecure;
0432: }
0433:
0434: /**
0435: * Return true for secure
0436: */
0437: public boolean isSecure() {
0438: return _isSecure || _sslFactory != null;
0439: }
0440:
0441: /**
0442: * Sets the server socket.
0443: */
0444: public void setServerSocket(QServerSocket socket) {
0445: _serverSocket = socket;
0446: }
0447:
0448: //
0449: // Configuration/Tuning
0450: //
0451:
0452: /**
0453: * Sets the minimum spare listen.
0454: */
0455: public void setAcceptThreadMin(int minSpare) throws ConfigException {
0456: if (minSpare < 1)
0457: throw new ConfigException(L
0458: .l("accept-thread-min must be at least 1."));
0459:
0460: _acceptThreadMin = minSpare;
0461: }
0462:
0463: /**
0464: * The minimum spare threads.
0465: */
0466: public int getAcceptThreadMin() {
0467: return _acceptThreadMin;
0468: }
0469:
0470: /**
0471: * Sets the minimum spare listen.
0472: */
0473: public void setAcceptThreadMax(int maxSpare) throws ConfigException {
0474: if (maxSpare < 1)
0475: throw new ConfigException(L
0476: .l("accept-thread-max must be at least 1."));
0477:
0478: _acceptThreadMax = maxSpare;
0479: }
0480:
0481: /**
0482: * The maximum spare threads.
0483: */
0484: public int getAcceptThreadMax() {
0485: return _acceptThreadMax;
0486: }
0487:
0488: /**
0489: * Sets the operating system listen backlog
0490: */
0491: public void setAcceptListenBacklog(int listen)
0492: throws ConfigException {
0493: if (listen < 1)
0494: throw new ConfigException(L
0495: .l("accept-listen-backlog must be at least 1."));
0496:
0497: _acceptListenBacklog = listen;
0498: }
0499:
0500: /**
0501: * The operating system listen backlog
0502: */
0503: public int getAcceptListenBacklog() {
0504: return _acceptListenBacklog;
0505: }
0506:
0507: /**
0508: * Sets the connection max.
0509: */
0510: public void setConnectionMax(int max) {
0511: _connectionMax = max;
0512: }
0513:
0514: /**
0515: * Gets the connection max.
0516: */
0517: public int getConnectionMax() {
0518: return _connectionMax;
0519: }
0520:
0521: /**
0522: * Returns true for ignore-client-disconnect.
0523: */
0524: public boolean isIgnoreClientDisconnect() {
0525: return _server.isIgnoreClientDisconnect();
0526: }
0527:
0528: /**
0529: * Sets the read/write timeout for the accepted sockets.
0530: */
0531: public void setSocketTimeout(Period period) {
0532: _socketTimeout = period.getPeriod();
0533: }
0534:
0535: /**
0536: * Sets the read timeout for the accepted sockets.
0537: *
0538: * @deprecated
0539: */
0540: public void setReadTimeout(Period period) {
0541: setSocketTimeout(period);
0542: }
0543:
0544: /**
0545: * Gets the read timeout for the accepted sockets.
0546: */
0547: public long getSocketTimeout() {
0548: return _socketTimeout;
0549: }
0550:
0551: /**
0552: * Gets the tcp-no-delay property
0553: */
0554: public boolean getTcpNoDelay() {
0555: return _tcpNoDelay;
0556: }
0557:
0558: /**
0559: * Sets the tcp-no-delay property
0560: */
0561: public void setTcpNoDelay(boolean tcpNoDelay) {
0562: _tcpNoDelay = tcpNoDelay;
0563: }
0564:
0565: /**
0566: * Configures the throttle.
0567: */
0568: public void setThrottleConcurrentMax(int max) {
0569: Throttle throttle = createThrottle();
0570:
0571: throttle.setMaxConcurrentRequests(max);
0572: }
0573:
0574: /**
0575: * Configures the throttle.
0576: */
0577: public long getThrottleConcurrentMax() {
0578: if (_throttle != null)
0579: return _throttle.getMaxConcurrentRequests();
0580: else
0581: return -1;
0582: }
0583:
0584: /**
0585: * Sets the write timeout for the accepted sockets.
0586: *
0587: * @deprecated
0588: */
0589: public void setWriteTimeout(Period period) {
0590: }
0591:
0592: private Throttle createThrottle() {
0593: if (_throttle == null) {
0594: _throttle = Throttle.createPro();
0595:
0596: if (_throttle == null)
0597: throw new ConfigException(
0598: L
0599: .l("throttle configuration requires Resin Professional"));
0600: }
0601:
0602: return _throttle;
0603: }
0604:
0605: //
0606: // compat config
0607: //
0608:
0609: /**
0610: * Sets the minimum spare listen.
0611: */
0612: public void setMinSpareListen(int minSpare) throws ConfigException {
0613: setAcceptThreadMin(minSpare);
0614: }
0615:
0616: /**
0617: * Sets the maximum spare listen.
0618: */
0619: public void setMaxSpareListen(int maxSpare) throws ConfigException {
0620: setAcceptThreadMax(maxSpare);
0621: }
0622:
0623: //
0624: // statistics
0625: //
0626:
0627: /**
0628: * Returns the number of connections
0629: */
0630: public int getConnectionCount() {
0631: return _connectionCount;
0632: }
0633:
0634: /**
0635: * Returns the number of comet connections.
0636: */
0637: public int getCometIdleCount() {
0638: return _suspendList.size();
0639: }
0640:
0641: public long getLifetimeRequestCount() {
0642: return _lifetimeRequestCount;
0643: }
0644:
0645: public long getLifetimeKeepaliveCount() {
0646: return _lifetimeKeepaliveCount;
0647: }
0648:
0649: public long getLifetimeClientDisconnectCount() {
0650: return _lifetimeClientDisconnectCount;
0651: }
0652:
0653: public long getLifetimeRequestTime() {
0654: return _lifetimeRequestTime;
0655: }
0656:
0657: public long getLifetimeReadBytes() {
0658: return _lifetimeReadBytes;
0659: }
0660:
0661: public long getLifetimeWriteBytes() {
0662: return _lifetimeWriteBytes;
0663: }
0664:
0665: /**
0666: * Sets the keepalive max.
0667: */
0668: public void setKeepaliveMax(int max) {
0669: _keepaliveMax = max;
0670: }
0671:
0672: /**
0673: * Gets the keepalive max.
0674: */
0675: public int getKeepaliveMax() {
0676: return _keepaliveMax;
0677: }
0678:
0679: /**
0680: * Sets the keepalive max.
0681: */
0682: public void setKeepaliveConnectionTimeMax(Period period) {
0683: _keepaliveTimeMax = period.getPeriod();
0684: }
0685:
0686: /**
0687: * Gets the keepalive max.
0688: */
0689: public long getKeepaliveConnectionTimeMax() {
0690: return _keepaliveTimeMax;
0691: }
0692:
0693: /**
0694: * Gets the suspend max.
0695: */
0696: public long getSuspendTimeMax() {
0697: return _suspendTimeMax;
0698: }
0699:
0700: public void setSuspendTimeMax(Period period) {
0701: _suspendTimeMax = period.getPeriod();
0702: }
0703:
0704: public void setKeepaliveTimeout(Period period) {
0705: _keepaliveTimeout = period.getPeriod();
0706: }
0707:
0708: public long getKeepaliveTimeout() {
0709: return _keepaliveTimeout;
0710: }
0711:
0712: public long getKeepaliveSelectThreadTimeout() {
0713: return _keepaliveSelectThreadTimeout;
0714: }
0715:
0716: //
0717: // statistics
0718: //
0719:
0720: /**
0721: * Returns the thread count.
0722: */
0723: public int getThreadCount() {
0724: return _threadCount;
0725: }
0726:
0727: /**
0728: * Returns the active thread count.
0729: */
0730: public int getActiveThreadCount() {
0731: return _threadCount - _idleThreadCount;
0732: }
0733:
0734: /**
0735: * Returns the count of idle threads.
0736: */
0737: public int getIdleThreadCount() {
0738: return _idleThreadCount;
0739: }
0740:
0741: /**
0742: * Returns the number of keepalive connections
0743: */
0744: public int getKeepaliveCount() {
0745: synchronized (_keepaliveCountLock) {
0746: return _keepaliveCount;
0747: }
0748: }
0749:
0750: public Lifecycle getLifecycleState() {
0751: return _lifecycle;
0752: }
0753:
0754: /**
0755: * Returns true if the port is active.
0756: */
0757: public boolean isActive() {
0758: return _lifecycle.isActive();
0759: }
0760:
0761: /**
0762: * Returns the active connections.
0763: */
0764: public int getActiveConnectionCount() {
0765: return _threadCount - _idleThreadCount;
0766: }
0767:
0768: /**
0769: * Returns the keepalive connections.
0770: */
0771: public int getKeepaliveConnectionCount() {
0772: return getKeepaliveCount();
0773: }
0774:
0775: /**
0776: * Returns the number of keepalive connections
0777: */
0778: public int getKeepaliveThreadCount() {
0779: synchronized (_keepaliveCountLock) {
0780: return _keepaliveThreadCount;
0781: }
0782: }
0783:
0784: /**
0785: * Returns the number of connections in the select.
0786: */
0787: public int getSelectConnectionCount() {
0788: if (_selectManager != null)
0789: return _selectManager.getSelectCount();
0790: else
0791: return -1;
0792: }
0793:
0794: /**
0795: * Returns the accept pool.
0796: */
0797: public int getFreeKeepalive() {
0798: int freeKeepalive = _keepaliveMax - _keepaliveCount;
0799: int freeConnections = _connectionMax - _connectionCount
0800: - _minSpareConnection;
0801: int freeSelect = _server.getFreeSelectKeepalive();
0802:
0803: if (freeKeepalive < freeConnections)
0804: return freeSelect < freeKeepalive ? freeSelect
0805: : freeKeepalive;
0806: else
0807: return freeSelect < freeConnections ? freeSelect
0808: : freeConnections;
0809: }
0810:
0811: /**
0812: * Returns true if the port matches the server id.
0813: */
0814: public boolean matchesServerId(String serverId) {
0815: return getServerId().equals("*")
0816: || getServerId().equals(serverId);
0817: }
0818:
0819: /**
0820: * Initializes the port.
0821: */
0822: @PostConstruct
0823: public void init() throws ConfigException {
0824: if (!_lifecycle.toInit())
0825: return;
0826:
0827: if (_server instanceof EnvironmentBean)
0828: Environment.addEnvironmentListener(this ,
0829: ((EnvironmentBean) _server).getClassLoader());
0830: }
0831:
0832: /**
0833: * Starts the port listening.
0834: */
0835: public void bind() throws Exception {
0836: synchronized (this ) {
0837: if (_isBind)
0838: return;
0839: _isBind = true;
0840: }
0841:
0842: if (_protocol == null)
0843: throw new IllegalStateException(
0844: L
0845: .l(
0846: "`{0}' must have a configured protocol before starting.",
0847: this ));
0848:
0849: if (_port == 0)
0850: return;
0851:
0852: if (_throttle == null)
0853: _throttle = new Throttle();
0854:
0855: if (_serverSocket != null) {
0856: if (_port == 0) {
0857: } else if (_address != null)
0858: log.info("listening to " + _address + ":" + _port);
0859: else
0860: log.info("listening to " + _port);
0861: } else if (_sslFactory != null && _socketAddress != null) {
0862: _serverSocket = _sslFactory.create(_socketAddress, _port);
0863:
0864: log.info(_protocol.getProtocolName() + "s listening to "
0865: + _socketAddress.getHostName() + ":" + _port);
0866: } else if (_sslFactory != null) {
0867: if (_address == null) {
0868: _serverSocket = _sslFactory.create(null, _port);
0869: log.info(_protocol.getProtocolName()
0870: + "s listening to *:" + _port);
0871: } else {
0872: InetAddress addr = InetAddress.getByName(_address);
0873:
0874: _serverSocket = _sslFactory.create(addr, _port);
0875:
0876: log.info(_protocol.getProtocolName()
0877: + "s listening to " + _address + ":" + _port);
0878: }
0879: } else if (_socketAddress != null) {
0880: _serverSocket = QJniServerSocket.create(_socketAddress,
0881: _port, _acceptListenBacklog);
0882:
0883: log.info(_protocol.getProtocolName() + " listening to "
0884: + _socketAddress.getHostName() + ":" + _port);
0885: } else {
0886: _serverSocket = QJniServerSocket.create(_port,
0887: _acceptListenBacklog);
0888:
0889: log.info(_protocol.getProtocolName() + " listening to *:"
0890: + _port);
0891: }
0892:
0893: assert (_serverSocket != null);
0894:
0895: postBind();
0896: }
0897:
0898: /**
0899: * Starts the port listening.
0900: */
0901: public void bind(QServerSocket ss) throws Exception {
0902: if (ss == null)
0903: throw new NullPointerException();
0904:
0905: _isBind = true;
0906:
0907: if (_protocol == null)
0908: throw new IllegalStateException(
0909: L
0910: .l(
0911: "'{0}' must have a configured protocol before starting.",
0912: this ));
0913:
0914: if (_throttle == null)
0915: _throttle = new Throttle();
0916:
0917: _serverSocket = ss;
0918:
0919: String scheme = _protocol.getProtocolName();
0920:
0921: if (_address != null)
0922: log
0923: .info(scheme + " listening to " + _address + ":"
0924: + _port);
0925: else
0926: log.info(scheme + " listening to *:" + _port);
0927:
0928: if (_sslFactory != null)
0929: _serverSocket = _sslFactory.bind(_serverSocket);
0930: }
0931:
0932: public void postBind() {
0933: synchronized (this ) {
0934: if (_isPostBind)
0935: return;
0936: _isPostBind = true;
0937: }
0938:
0939: if (_tcpNoDelay)
0940: _serverSocket.setTcpNoDelay(_tcpNoDelay);
0941:
0942: _serverSocket
0943: .setConnectionSocketTimeout((int) getSocketTimeout());
0944:
0945: if (_keepaliveMax < 0)
0946: _keepaliveMax = _server.getKeepaliveMax();
0947:
0948: if (_keepaliveMax < 0)
0949: _keepaliveMax = 256;
0950:
0951: if (_serverSocket.isJNI() && _server.isSelectManagerEnabled()) {
0952: _selectManager = _server.getSelectManager();
0953:
0954: if (_selectManager == null) {
0955: throw new IllegalStateException(L
0956: .l("Cannot load select manager"));
0957: }
0958: }
0959:
0960: _admin.register();
0961: }
0962:
0963: /**
0964: * binds for the watchdog.
0965: */
0966: public QServerSocket bindForWatchdog() throws java.io.IOException {
0967: QServerSocket ss;
0968:
0969: if (_socketAddress != null) {
0970: ss = QJniServerSocket.createJNI(_socketAddress, _port);
0971:
0972: if (ss == null)
0973: return null;
0974:
0975: log.fine(this + " watchdog binding to "
0976: + _socketAddress.getHostName() + ":" + _port);
0977: } else {
0978: ss = QJniServerSocket.createJNI(null, _port);
0979:
0980: if (ss == null)
0981: return null;
0982:
0983: log.fine(this + " watchdog binding to *:" + _port);
0984: }
0985:
0986: if (!ss.isJNI()) {
0987: ss.close();
0988:
0989: return ss;
0990: }
0991:
0992: if (_tcpNoDelay)
0993: ss.setTcpNoDelay(_tcpNoDelay);
0994:
0995: ss.setConnectionSocketTimeout((int) getSocketTimeout());
0996:
0997: return ss;
0998: }
0999:
1000: /**
1001: * Starts the port listening.
1002: */
1003: public void start() throws Exception {
1004: if (_port == 0)
1005: return;
1006:
1007: if (!_lifecycle.toStarting())
1008: return;
1009:
1010: boolean isValid = false;
1011: try {
1012: bind();
1013: postBind();
1014:
1015: String name = "resin-port-" + _serverSocket.getLocalPort();
1016: Thread thread = new Thread(this , name);
1017: thread.setDaemon(true);
1018:
1019: thread.start();
1020:
1021: enable();
1022:
1023: _suspendAlarm = new Alarm(new SuspendReaper());
1024: _suspendAlarm.queue(60000);
1025:
1026: isValid = true;
1027: } finally {
1028: if (!isValid)
1029: close();
1030: }
1031: }
1032:
1033: /**
1034: * Starts the port listening for new connections.
1035: */
1036: void enable() {
1037: if (_lifecycle.toActive()) {
1038: _serverSocket.listen(_acceptListenBacklog);
1039: }
1040: }
1041:
1042: /**
1043: * Stops the port from listening for new connections.
1044: */
1045: void disable() {
1046: if (_lifecycle.toStop()) {
1047: _serverSocket.listen(0);
1048:
1049: if (_port == 0) {
1050: } else if (_address != null)
1051: log.info(_protocol.getProtocolName() + " disabled "
1052: + _address + ":" + _port);
1053: else
1054: log.info(_protocol.getProtocolName() + " disabled *:"
1055: + _port);
1056: }
1057: }
1058:
1059: /**
1060: * returns the select manager.
1061: */
1062: public AbstractSelectManager getSelectManager() {
1063: return _selectManager;
1064: }
1065:
1066: /**
1067: * Accepts a new connection.
1068: */
1069: public boolean accept(TcpConnection conn, boolean isFirst) {
1070: try {
1071: synchronized (this ) {
1072: _idleThreadCount++;
1073:
1074: if (isFirst) {
1075: _startThreadCount--;
1076:
1077: if (_startThreadCount < 0) {
1078: Thread.dumpStack();
1079: }
1080: }
1081:
1082: if (_acceptThreadMax < _idleThreadCount) {
1083: return false;
1084: }
1085: }
1086:
1087: while (_lifecycle.isActive()) {
1088: QSocket socket = conn.startSocket();
1089:
1090: Thread.interrupted();
1091: if (_serverSocket.accept(socket)) {
1092: conn.initSocket();
1093:
1094: if (_throttle.accept(socket))
1095: return true;
1096: else
1097: socket.close();
1098: } else {
1099: if (_acceptThreadMax < _idleThreadCount) {
1100: return false;
1101: }
1102: }
1103: }
1104: } catch (Throwable e) {
1105: if (_lifecycle.isActive() && log.isLoggable(Level.FINER))
1106: log.log(Level.FINER, e.toString(), e);
1107: } finally {
1108: synchronized (this ) {
1109: _idleThreadCount--;
1110:
1111: if (_idleThreadCount + _startThreadCount < _acceptThreadMin) {
1112: notify();
1113: }
1114: }
1115: }
1116:
1117: return false;
1118: }
1119:
1120: /**
1121: * Notification when a socket closes.
1122: */
1123: void closeSocket(QSocket socket) {
1124: if (_throttle != null)
1125: _throttle.close(socket);
1126: }
1127:
1128: /**
1129: * Registers the new connection as started
1130: */
1131: void startConnection(TcpConnection conn) {
1132: synchronized (this ) {
1133: _startThreadCount--;
1134: }
1135: }
1136:
1137: /**
1138: * Marks a new thread as running.
1139: */
1140: void threadBegin(TcpConnection conn) {
1141: synchronized (_threadCountLock) {
1142: _threadCount++;
1143: }
1144: }
1145:
1146: /**
1147: * Marks a new thread as stopped.
1148: */
1149: void threadEnd(TcpConnection conn) {
1150: synchronized (_threadCountLock) {
1151: _threadCount--;
1152: }
1153: }
1154:
1155: /**
1156: * Returns true if the keepalive is allowed
1157: */
1158: public boolean allowKeepalive(long acceptStartTime) {
1159: synchronized (_keepaliveCountLock) {
1160: if (!_lifecycle.isActive())
1161: return false;
1162: else if (acceptStartTime + _keepaliveTimeMax < Alarm
1163: .getCurrentTime())
1164: return false;
1165: else if (_keepaliveMax <= _keepaliveCount)
1166: return false;
1167: else if (_connectionMax <= _connectionCount
1168: + _minSpareConnection)
1169: return false;
1170: else
1171: return true;
1172: }
1173: }
1174:
1175: /**
1176: * Marks a keepalive as starting running. Called only from TcpConnection.
1177: */
1178: boolean keepaliveBegin(TcpConnection conn, long acceptStartTime) {
1179: synchronized (_keepaliveCountLock) {
1180: if (!_lifecycle.isActive())
1181: return false;
1182: else if (_connectionMax <= _connectionCount
1183: + _minSpareConnection) {
1184: log.warning(conn + " failed keepalive connection max "
1185: + _connectionCount);
1186:
1187: return false;
1188: } else if (false && acceptStartTime + _keepaliveTimeMax < Alarm
1189: .getCurrentTime()) {
1190: // #2262 - skip this check to avoid confusing the load balancer
1191: // the keepalive check is in allowKeepalive
1192: log.warning(conn + " failed keepalive delay "
1193: + (Alarm.getCurrentTime() - acceptStartTime));
1194:
1195: return false;
1196: } else if (false && _keepaliveMax <= _keepaliveCount) {
1197: // #2262 - skip this check to avoid confusing the load balancer
1198: // the keepalive check is in allowKeepalive
1199: log.warning(conn + " failed keepalive max "
1200: + _keepaliveCount);
1201:
1202: return false;
1203: }
1204:
1205: _keepaliveCount++;
1206:
1207: return true;
1208: }
1209: }
1210:
1211: /**
1212: * Marks the keepalive as ending. Called only from TcpConnection.
1213: */
1214: void keepaliveEnd(TcpConnection conn) {
1215: synchronized (_keepaliveCountLock) {
1216: _keepaliveCount--;
1217:
1218: if (_keepaliveCount < 0) {
1219: int count = _keepaliveCount;
1220: _keepaliveCount = 0;
1221:
1222: log.warning("internal error: negative keepalive count "
1223: + count);
1224: }
1225: }
1226: }
1227:
1228: /**
1229: * Starts a keepalive thread.
1230: */
1231: void keepaliveThreadBegin() {
1232: synchronized (_keepaliveCountLock) {
1233: _keepaliveThreadCount++;
1234: }
1235: }
1236:
1237: /**
1238: * Ends a keepalive thread.
1239: */
1240: void keepaliveThreadEnd() {
1241: synchronized (_keepaliveCountLock) {
1242: _keepaliveThreadCount--;
1243: }
1244: }
1245:
1246: /**
1247: * Suspends the controller (for comet-style ajax)
1248: */
1249: boolean suspend(TcpConnection conn) {
1250: boolean isResume = false;
1251:
1252: synchronized (_suspendList) {
1253: if (conn.isWake()) {
1254: isResume = true;
1255: conn.setResume();
1256: } else if (conn.isComet()) {
1257: _suspendList.add(conn);
1258: return true;
1259: } else
1260: return false;
1261: }
1262:
1263: if (isResume) {
1264: ThreadPool.getThreadPool().schedule(conn);
1265: return true;
1266: } else
1267: return false;
1268: }
1269:
1270: /**
1271: * Remove from suspend list.
1272: */
1273: boolean detach(TcpConnection conn) {
1274: synchronized (_suspendList) {
1275: return _suspendList.remove(conn);
1276: }
1277: }
1278:
1279: /**
1280: * Suspends the controller (for comet-style ajax)
1281: */
1282: boolean resume(TcpConnection conn) {
1283: synchronized (_suspendList) {
1284: if (!_suspendList.remove(conn))
1285: return false;
1286:
1287: conn.setResume();
1288: }
1289:
1290: if (conn != null)
1291: ThreadPool.getThreadPool().schedule(conn);
1292:
1293: return true;
1294: }
1295:
1296: /**
1297: * Returns true if the port is closed.
1298: */
1299: public boolean isClosed() {
1300: return _lifecycle.isAfterActive();
1301: }
1302:
1303: /**
1304: * The port thread is responsible for creating new connections.
1305: */
1306: public void run() {
1307: while (!_lifecycle.isDestroyed()) {
1308: boolean isStart;
1309:
1310: try {
1311: // need delay to avoid spawing too many threads over a short time,
1312: // when the load doesn't justify it
1313: Thread.yield();
1314: Thread.sleep(10);
1315:
1316: synchronized (this ) {
1317: isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
1318: if (_connectionMax <= _connectionCount)
1319: isStart = false;
1320:
1321: if (!isStart) {
1322: Thread.interrupted();
1323: wait(60000);
1324: }
1325:
1326: if (isStart) {
1327: _connectionCount++;
1328: _startThreadCount++;
1329: }
1330: }
1331:
1332: if (isStart && _lifecycle.isActive()) {
1333: TcpConnection conn = _freeConn.allocate();
1334: if (conn == null) {
1335: conn = new TcpConnection(this , _serverSocket
1336: .createSocket());
1337: conn.setRequest(_protocol.createRequest(conn));
1338: }
1339:
1340: conn.start();
1341:
1342: ThreadPool.getThreadPool().schedule(conn);
1343: }
1344: } catch (Throwable e) {
1345: e.printStackTrace();
1346: }
1347: }
1348: }
1349:
1350: /**
1351: * Handles the environment config phase
1352: */
1353: public void environmentConfig(EnvironmentClassLoader loader) {
1354: }
1355:
1356: /**
1357: * Handles the case where the environment is starting (after init).
1358: */
1359: public void environmentStart(EnvironmentClassLoader loader) {
1360: }
1361:
1362: /**
1363: * Handles the case where the environment is stopping
1364: */
1365: public void environmentStop(EnvironmentClassLoader loader) {
1366: close();
1367: }
1368:
1369: /**
1370: * Frees the connection.
1371: *
1372: * Called only from TcpConnection
1373: */
1374: void free(TcpConnection conn) {
1375: closeConnection(conn);
1376:
1377: if (!_freeConn.free(conn))
1378: conn.destroy();
1379: }
1380:
1381: /**
1382: * Frees the connection.
1383: *
1384: * Called only from TcpConnection
1385: */
1386: void kill(TcpConnection conn) {
1387: closeConnection(conn);
1388: }
1389:
1390: /**
1391: * Closes the stats for the connection.
1392: */
1393: private void closeConnection(TcpConnection conn) {
1394: synchronized (this ) {
1395: if (_connectionCount-- == _connectionMax) {
1396: try {
1397: notify();
1398: } catch (Throwable e) {
1399: }
1400: }
1401: }
1402: }
1403:
1404: /**
1405: * Shuts the Port down. The server gives connections 30
1406: * seconds to complete.
1407: */
1408: public void close() {
1409: Environment.removeEnvironmentListener(this );
1410:
1411: if (!_lifecycle.toDestroy())
1412: return;
1413:
1414: if (log.isLoggable(Level.FINE))
1415: log.fine(this + " closing");
1416:
1417: Alarm suspendAlarm = _suspendAlarm;
1418: _suspendAlarm = null;
1419:
1420: if (suspendAlarm != null)
1421: suspendAlarm.dequeue();
1422:
1423: QServerSocket serverSocket = _serverSocket;
1424: _serverSocket = null;
1425:
1426: _selectManager = null;
1427: AbstractSelectManager selectManager = null;
1428:
1429: if (_server != null) {
1430: selectManager = _server.getSelectManager();
1431: _server.initSelectManager(null);
1432: }
1433:
1434: InetAddress localAddress = null;
1435: int localPort = 0;
1436: if (serverSocket != null) {
1437: localAddress = serverSocket.getLocalAddress();
1438: localPort = serverSocket.getLocalPort();
1439: }
1440:
1441: // close the server socket
1442: if (serverSocket != null) {
1443: try {
1444: serverSocket.close();
1445: } catch (Throwable e) {
1446: }
1447:
1448: try {
1449: synchronized (serverSocket) {
1450: serverSocket.notifyAll();
1451: }
1452: } catch (Throwable e) {
1453: }
1454: }
1455:
1456: if (selectManager != null) {
1457: try {
1458: selectManager.close();
1459: } catch (Throwable e) {
1460: }
1461: }
1462:
1463: /*
1464: ArrayList<TcpConnection> connections = new ArrayList<TcpConnection>();
1465: synchronized (this) {
1466: connections.addAll(_activeConnections);
1467: }
1468: */
1469:
1470: // Close the socket server socket and send some request to make
1471: // sure the Port accept thread is woken and dies.
1472: // The ping is before the server socket closes to avoid
1473: // confusing the threads
1474: // ping the accept port to wake the listening threads
1475: if (localPort > 0) {
1476: int idleCount = _idleThreadCount + _startThreadCount;
1477:
1478: for (int i = 0; i < idleCount + 10; i++) {
1479: try {
1480: Socket socket = new Socket();
1481: InetSocketAddress addr;
1482:
1483: if (localAddress == null
1484: || localAddress.getHostAddress()
1485: .startsWith("0."))
1486: addr = new InetSocketAddress("127.0.0.1",
1487: localPort);
1488: else
1489: addr = new InetSocketAddress(localAddress,
1490: localPort);
1491:
1492: socket.connect(addr, 100);
1493:
1494: socket.close();
1495: } catch (ConnectException e) {
1496: } catch (Throwable e) {
1497: log.log(Level.FINEST, e.toString(), e);
1498: }
1499: }
1500: }
1501:
1502: TcpConnection conn;
1503: while ((conn = _freeConn.allocate()) != null) {
1504: conn.destroy();
1505: }
1506:
1507: log.finest(this + " closed");
1508: }
1509:
1510: public String toString() {
1511: return "Port[" + getAddress() + ":" + getPort() + "]";
1512: }
1513:
1514: public class SuspendReaper implements AlarmListener {
1515: public void handleAlarm(Alarm alarm) {
1516: try {
1517: ArrayList<TcpConnection> oldList = null;
1518:
1519: long now = Alarm.getCurrentTime();
1520: synchronized (_suspendList) {
1521: for (int i = _suspendList.size() - 1; i >= 0; i--) {
1522: TcpConnection conn = _suspendList.get(i);
1523:
1524: if (conn.getSuspendTime() + _suspendTimeMax < now) {
1525: _suspendList.remove(i);
1526:
1527: if (oldList == null)
1528: oldList = new ArrayList<TcpConnection>();
1529:
1530: oldList.add(conn);
1531: }
1532: }
1533: }
1534:
1535: if (oldList != null) {
1536: for (int i = 0; i < oldList.size(); i++) {
1537: TcpConnection conn = oldList.get(i);
1538:
1539: if (log.isLoggable(Level.FINE))
1540: log.fine(this + " comet idle timeout "
1541: + conn);
1542:
1543: conn.destroy();
1544: }
1545: }
1546: } finally {
1547: if (!isClosed())
1548: alarm.queue(60000);
1549: }
1550: }
1551: }
1552: }
|