0001: /*
0002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
0003: *
0004: * This file is part of Resin(R) Open Source
0005: *
0006: * Each copy or derived work must preserve the copyright notice and this
0007: * notice unmodified.
0008: *
0009: * Resin Open Source is free software; you can redistribute it and/or modify
0010: * it under the terms of the GNU General Public License as published by
0011: * the Free Software Foundation; either version 2 of the License, or
0012: * (at your option) any later version.
0013: *
0014: * Resin Open Source is distributed in the hope that it will be useful,
0015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
0017: * of NON-INFRINGEMENT. See the GNU General Public License for more
0018: * details.
0019: *
0020: * You should have received a copy of the GNU General Public License
0021: * along with Resin Open Source; if not, write to the
0022: *
0023: * Free Software Foundation, Inc.
0024: * 59 Temple Place, Suite 330
0025: * Boston, MA 02111-1307 USA
0026: *
0027: * @author Scott Ferguson
0028: */
0029:
0030: package com.caucho.server.cluster;
0031:
0032: import com.caucho.management.server.ServerConnectorMXBean;
0033: import com.caucho.util.L10N;
0034: import com.caucho.util.Alarm;
0035: import com.caucho.vfs.*;
0036: import com.caucho.server.resin.*;
0037:
0038: import javax.management.ObjectName;
0039: import java.io.IOException;
0040: import java.util.*;
0041: import java.util.logging.Level;
0042: import java.util.logging.Logger;
0043:
0044: /**
0045: * Defines a member of the cluster.
0046: *
0047: * A {@link ClusterClient} obtained with {@link #getClient} is used to actually
0048: * communicate with this ClusterServer when it is active in another instance of
0049: * Resin .
0050: */
0051: public class ServerConnector {
0052: private static final Logger log = Logger
0053: .getLogger(ServerConnector.class.getName());
0054: private static final L10N L = new L10N(ServerConnector.class);
0055:
0056: private static final int ST_NEW = 0;
0057: private static final int ST_STANDBY = 1;
0058: private static final int ST_SESSION_ONLY = 2;
0059: // the following 5 are the active states
0060: private static final int ST_STARTING = 3;
0061: private static final int ST_WARMUP = 4;
0062: private static final int ST_BUSY = 5;
0063: private static final int ST_FAIL = 6;
0064: private static final int ST_ACTIVE = 7;
0065: private static final int ST_CLOSED = 8;
0066:
0067: // number of chunks in the throttling
0068: private static final int WARMUP_MAX = 16;
0069: private static final int WARMUP_MIN = -16;
0070: private static final int[] WARMUP_CONNECTION_MAX = new int[] { 1,
0071: 1, 1, 1, 2, 2, 2, 2, 4, 4, 8, 8, 16, 32, 64, 128 };
0072:
0073: private ClusterServer _server;
0074: private ClusterPort _port;
0075:
0076: private ObjectName _objectName;
0077:
0078: private Cluster _cluster;
0079: private Path _tcpPath;
0080:
0081: private ServerConnectorAdmin _admin;
0082:
0083: private String _debugId;
0084:
0085: private int _maxConnections = Integer.MAX_VALUE / 2;
0086:
0087: private ClusterStream[] _idle = new ClusterStream[64];
0088: private volatile int _idleHead;
0089: private volatile int _idleTail;
0090: private int _idleSize = 16;
0091:
0092: private int _streamCount;
0093:
0094: private long _warmupTime;
0095: private long _warmupChunkTime;
0096:
0097: private long _failRecoverTime;
0098: private long _failChunkTime;
0099:
0100: private volatile int _state = ST_NEW;
0101:
0102: // current connection count
0103: private volatile int _activeCount;
0104: private volatile int _startingCount;
0105:
0106: private volatile int _loadBalanceAllocateCount;
0107:
0108: // numeric value representing the throttle state
0109: private volatile int _warmupState;
0110:
0111: // load management data
0112: private volatile long _lastFailConnectTime;
0113: private volatile long _dynamicFailRecoverTime = 1000L;
0114:
0115: private volatile long _lastFailTime;
0116: private volatile long _lastBusyTime;
0117:
0118: private volatile long _failTime;
0119: private volatile long _firstSuccessTime;
0120: private volatile long _lastSuccessTime;
0121: private volatile long _prevSuccessTime;
0122: private volatile double _latencyFactor;
0123:
0124: // statistics
0125: private volatile long _keepaliveCountTotal;
0126: private volatile long _connectCountTotal;
0127: private volatile long _failCountTotal;
0128: private volatile long _busyCountTotal;
0129:
0130: private volatile double _cpuLoadAvg;
0131: private volatile long _cpuSetTime;
0132:
0133: public ServerConnector(ClusterServer server) {
0134: _server = server;
0135: _cluster = _server.getCluster();
0136: _port = server.getClusterPort();
0137: }
0138:
0139: /**
0140: * Gets the owning cluster.
0141: */
0142: public Cluster getCluster() {
0143: return _cluster;
0144: }
0145:
0146: /**
0147: * Returns the object name.
0148: */
0149: public ObjectName getObjectName() {
0150: return _objectName;
0151: }
0152:
0153: /**
0154: * Returns the admin.
0155: */
0156: public ServerConnectorMXBean getAdmin() {
0157: return _admin;
0158: }
0159:
0160: /**
0161: * Gets the cluster port.
0162: */
0163: public ClusterPort getClusterPort() {
0164: return _port;
0165: }
0166:
0167: /**
0168: * Returns the user-readable id of the target server.
0169: */
0170: public String getId() {
0171: return _server.getId();
0172: }
0173:
0174: /**
0175: * Returns the index of this connection in the connection group.
0176: */
0177: public int getIndex() {
0178: return _server.getIndex();
0179: }
0180:
0181: /**
0182: * Returns the hostname of the target server.
0183: */
0184: public String getAddress() {
0185: return _port.getAddress();
0186: }
0187:
0188: /**
0189: * Gets the port of the target server.
0190: */
0191: public int getPort() {
0192: return _port.getPort();
0193: }
0194:
0195: /**
0196: * Returns the time in milliseconds for the slow start throttling.
0197: */
0198: public long getLoadBalanceWarmupTime() {
0199: return _server.getLoadBalanceWarmupTime();
0200: }
0201:
0202: /**
0203: * Returns the socket timeout when connecting to the
0204: * target server.
0205: */
0206: public long getLoadBalanceConnectTimeout() {
0207: return _server.getLoadBalanceConnectTimeout();
0208: }
0209:
0210: /**
0211: * Returns the socket timeout when reading from the
0212: * target server.
0213: */
0214: public long getLoadBalanceSocketTimeout() {
0215: return _server.getLoadBalanceSocketTimeout();
0216: }
0217:
0218: /**
0219: * Returns how long the connection can be cached in the free pool.
0220: */
0221: public long getLoadBalanceIdleTime() {
0222: return _server.getLoadBalanceIdleTime();
0223: }
0224:
0225: /**
0226: * Returns how long the connection will be treated as dead.
0227: */
0228: public long getLoadBalanceRecoverTime() {
0229: return _server.getLoadBalanceRecoverTime();
0230: }
0231:
0232: /**
0233: * Returns the load balance weight.
0234: */
0235: public int getLoadBalanceWeight() {
0236: return _server.getLoadBalanceWeight();
0237: }
0238:
0239: /**
0240: * Initialize
0241: */
0242: public void init() throws Exception {
0243: _warmupTime = _server.getLoadBalanceWarmupTime();
0244: _warmupChunkTime = _warmupTime / WARMUP_MAX;
0245: if (_warmupChunkTime <= 0)
0246: _warmupChunkTime = 1;
0247:
0248: _failRecoverTime = _server.getLoadBalanceRecoverTime();
0249: _failChunkTime = _failRecoverTime / WARMUP_MAX;
0250: if (_failChunkTime <= 0)
0251: _failChunkTime = 1;
0252:
0253: _state = ST_STARTING;
0254:
0255: String address = getAddress();
0256:
0257: if (address == null)
0258: address = "localhost";
0259:
0260: HashMap<String, Object> attr = new HashMap<String, Object>();
0261: attr.put("connect-timeout", new Long(
0262: getLoadBalanceConnectTimeout()));
0263:
0264: if (_port.isSSL())
0265: _tcpPath = Vfs.lookup(
0266: "tcps://" + address + ":" + getPort(), attr);
0267: else
0268: _tcpPath = Vfs.lookup("tcp://" + address + ":" + getPort(),
0269: attr);
0270:
0271: _admin = new ServerConnectorAdmin(this );
0272:
0273: Thread thread = Thread.currentThread();
0274: ClassLoader oldLoader = thread.getContextClassLoader();
0275: try {
0276: Resin resin = Resin.getLocal();
0277:
0278: if (resin != null)
0279: thread.setContextClassLoader(resin.getClassLoader());
0280:
0281: String name = getId();
0282:
0283: if (name == null)
0284: name = "";
0285:
0286: } catch (Exception e) {
0287: log.log(Level.FINER, e.toString(), e);
0288: } finally {
0289: thread.setContextClassLoader(oldLoader);
0290: }
0291: }
0292:
0293: public void register() {
0294: _admin.register();
0295: }
0296:
0297: /**
0298: * Returns the number of active connections.
0299: */
0300: public int getActiveCount() {
0301: return _activeCount;
0302: }
0303:
0304: /**
0305: * Returns the number of idle connections.
0306: */
0307: public int getIdleCount() {
0308: return (_idleHead - _idleTail + _idle.length) % _idle.length;
0309: }
0310:
0311: /**
0312: * Returns the number of load balance allocations
0313: */
0314: public int getLoadBalanceAllocateCount() {
0315: return _loadBalanceAllocateCount;
0316: }
0317:
0318: /**
0319: * Allocate a connection for load balancing.
0320: */
0321: public void allocateLoadBalance() {
0322: synchronized (this ) {
0323: _loadBalanceAllocateCount++;
0324: }
0325: }
0326:
0327: /**
0328: * Free a connection for load balancing.
0329: */
0330: public void freeLoadBalance() {
0331: synchronized (this ) {
0332: _loadBalanceAllocateCount--;
0333: }
0334: }
0335:
0336: /**
0337: * Returns the total number of successful socket connections
0338: */
0339: public long getConnectCountTotal() {
0340: return _connectCountTotal;
0341: }
0342:
0343: /**
0344: * Returns the number of times a keepalive connection has been used.
0345: */
0346: public long getKeepaliveCountTotal() {
0347: return _keepaliveCountTotal;
0348: }
0349:
0350: /**
0351: * Returns the total number of failed connect attempts.
0352: */
0353: public long getFailCountTotal() {
0354: return _failCountTotal;
0355: }
0356:
0357: /**
0358: * Returns the time of the last failure.
0359: */
0360: public Date getLastFailTime() {
0361: return new Date(_lastFailTime);
0362: }
0363:
0364: /**
0365: * Returns the time of the last failure.
0366: */
0367: public Date getLastFailConnectTime() {
0368: return new Date(_lastFailConnectTime);
0369: }
0370:
0371: /**
0372: * Returns the time of the last failure.
0373: */
0374: public long getLastSuccessTime() {
0375: return _lastSuccessTime;
0376: }
0377:
0378: /**
0379: * Returns the latency factory
0380: */
0381: public double getLatencyFactor() {
0382: return _latencyFactor;
0383: }
0384:
0385: /**
0386: * Returns the count of busy connections.
0387: */
0388: public long getBusyCountTotal() {
0389: return _busyCountTotal;
0390: }
0391:
0392: /**
0393: * Returns the time of the last busy.
0394: */
0395: public Date getLastBusyTime() {
0396: return new Date(_lastBusyTime);
0397: }
0398:
0399: /**
0400: * Sets the CPU load avg (from backend).
0401: */
0402: public void setCpuLoadAvg(double load) {
0403: _cpuSetTime = Alarm.getCurrentTime();
0404: _cpuLoadAvg = load;
0405: }
0406:
0407: /**
0408: * Gets the CPU load avg
0409: */
0410: public double getCpuLoadAvg() {
0411: double avg = _cpuLoadAvg;
0412: long time = _cpuSetTime;
0413:
0414: long now = Alarm.getCurrentTime();
0415:
0416: if (now - time < 10000L)
0417: return avg;
0418: else
0419: return avg * 10000L / (now - time);
0420: }
0421:
0422: /**
0423: * Returns the debug id.
0424: */
0425: public String getDebugId() {
0426: if (_debugId == null) {
0427: String selfId = null;
0428: Cluster localCluster = Cluster.getLocal();
0429: if (localCluster != null)
0430: selfId = localCluster.getId();
0431:
0432: if (selfId == null || selfId.equals(""))
0433: selfId = "default";
0434:
0435: String targetId = _server.getId();
0436: if (targetId == null || targetId.equals(""))
0437: targetId = String.valueOf(_server.getIndex());
0438:
0439: _debugId = selfId + "->" + targetId;
0440: }
0441:
0442: return _debugId;
0443: }
0444:
0445: /**
0446: * Returns true if the server is active.
0447: */
0448: public final boolean isActive() {
0449: switch (_state) {
0450: case ST_ACTIVE:
0451: return true;
0452:
0453: case ST_STANDBY:
0454: case ST_CLOSED:
0455: return false;
0456:
0457: case ST_FAIL:
0458: return (_failTime + _failRecoverTime <= Alarm
0459: .getCurrentTime());
0460:
0461: default:
0462: return false;
0463: }
0464: }
0465:
0466: /**
0467: * Returns true if the server is dead.
0468: */
0469: public boolean isDead() {
0470: return !isActive();
0471: }
0472:
0473: /**
0474: * Enable the client
0475: */
0476: public void enable() {
0477: start();
0478: }
0479:
0480: /**
0481: * Disable the client
0482: */
0483: public void disable() {
0484: stop();
0485: }
0486:
0487: /**
0488: * Returns the lifecycle state.
0489: */
0490: public String getState() {
0491: updateWarmup();
0492:
0493: switch (_state) {
0494: case ST_NEW:
0495: return "init";
0496: case ST_STANDBY:
0497: return "standby";
0498: case ST_SESSION_ONLY:
0499: return "session-only";
0500: case ST_STARTING:
0501: return "starting";
0502: case ST_WARMUP:
0503: return "warmup";
0504: case ST_BUSY:
0505: return "busy";
0506: case ST_FAIL:
0507: return "fail";
0508: case ST_ACTIVE:
0509: return "active";
0510: case ST_CLOSED:
0511: return "closed";
0512: default:
0513: return "unknown(" + _state + ")";
0514: }
0515: }
0516:
0517: /**
0518: * Returns true if the server can open a connection.
0519: */
0520: public boolean canOpenSoftOrRecycle() {
0521: return getIdleCount() > 0 || canOpenSoft();
0522: }
0523:
0524: /**
0525: * Returns true if the server can open a connection.
0526: */
0527: public boolean canOpenSoft() {
0528: int state = _state;
0529:
0530: if (state == ST_ACTIVE)
0531: return true;
0532: else if (ST_STARTING <= state && state < ST_ACTIVE) {
0533: long now = Alarm.getCurrentTime();
0534:
0535: if (now < _lastFailConnectTime + _dynamicFailRecoverTime) {
0536: return false;
0537: }
0538:
0539: int warmupState = _warmupState;
0540:
0541: if (warmupState < 0) {
0542: return (_failTime - warmupState * _failChunkTime < now);
0543: } else if (WARMUP_MAX <= warmupState)
0544: return true;
0545:
0546: int connectionMax = WARMUP_CONNECTION_MAX[warmupState];
0547:
0548: int idleCount = getIdleCount();
0549: int activeCount = _activeCount + _startingCount;
0550: int totalCount = activeCount + idleCount;
0551:
0552: return totalCount < connectionMax;
0553: } else {
0554: return false;
0555: }
0556: }
0557:
0558: /**
0559: * Return true if active.
0560: */
0561: public boolean isEnabled() {
0562: int state = _state;
0563:
0564: return ST_STARTING <= state && state <= ST_ACTIVE;
0565: }
0566:
0567: private void toActive() {
0568: synchronized (this ) {
0569: if (_state < ST_CLOSED)
0570: _state = ST_ACTIVE;
0571: }
0572: }
0573:
0574: public void toBusy() {
0575: _lastBusyTime = Alarm.getCurrentTime();
0576: _firstSuccessTime = 0;
0577:
0578: synchronized (this ) {
0579: _busyCountTotal++;
0580:
0581: if (_state < ST_CLOSED)
0582: _state = ST_BUSY;
0583: }
0584: }
0585:
0586: public void toFail() {
0587: _failTime = Alarm.getCurrentTime();
0588: _lastFailTime = _failTime;
0589: _firstSuccessTime = 0;
0590:
0591: synchronized (this ) {
0592: _failCountTotal++;
0593:
0594: if (_state < ST_CLOSED)
0595: _state = ST_FAIL;
0596: }
0597:
0598: clearRecycle();
0599: }
0600:
0601: /**
0602: * Called when the socket read/write fails.
0603: */
0604: public void failSocket() {
0605: synchronized (this ) {
0606: _failCountTotal++;
0607:
0608: long now = Alarm.getCurrentTime();
0609: _firstSuccessTime = 0;
0610:
0611: // only degrade one per 100ms
0612: if (now - _failTime >= 100) {
0613: _warmupState--;
0614: _failTime = now;
0615: _lastFailTime = _failTime;
0616: }
0617:
0618: if (_warmupState < WARMUP_MIN)
0619: _warmupState = WARMUP_MIN;
0620:
0621: if (_state < ST_CLOSED)
0622: _state = ST_FAIL;
0623: }
0624: }
0625:
0626: /**
0627: * Called when the socket read/write fails.
0628: */
0629: public void failConnect() {
0630: synchronized (this ) {
0631: _failCountTotal++;
0632:
0633: _firstSuccessTime = 0;
0634:
0635: // only degrade one per 100ms
0636: _warmupState--;
0637: long now = Alarm.getCurrentTime();
0638: _failTime = now;
0639: _lastFailTime = _failTime;
0640: _lastFailConnectTime = now;
0641: _dynamicFailRecoverTime *= 2;
0642: if (_failRecoverTime < _dynamicFailRecoverTime)
0643: _dynamicFailRecoverTime = _failRecoverTime;
0644:
0645: if (_warmupState < WARMUP_MIN)
0646: _warmupState = WARMUP_MIN;
0647:
0648: if (_state < ST_CLOSED)
0649: _state = ST_FAIL;
0650: }
0651: }
0652:
0653: /**
0654: * Called when the server responds with "busy", e.g. HTTP 503
0655: */
0656: public void busy() {
0657: synchronized (this ) {
0658: _lastBusyTime = Alarm.getCurrentTime();
0659: _firstSuccessTime = 0;
0660:
0661: _warmupState--;
0662: if (_warmupState < 0)
0663: _warmupState = 0;
0664:
0665: _busyCountTotal++;
0666:
0667: if (_state < ST_CLOSED)
0668: _state = ST_BUSY;
0669: }
0670: }
0671:
0672: /**
0673: * Enable the client.
0674: */
0675: public void start() {
0676: synchronized (this ) {
0677: if (_state == ST_ACTIVE) {
0678: } else if (_state < ST_CLOSED)
0679: _state = ST_STARTING;
0680: }
0681: }
0682:
0683: /**
0684: * Disable the client.
0685: */
0686: public void stop() {
0687: synchronized (this ) {
0688: if (_state < ST_CLOSED)
0689: _state = ST_STANDBY;
0690: }
0691: }
0692:
0693: /**
0694: * Session only
0695: */
0696: public void enableSessionOnly() {
0697: synchronized (this ) {
0698: if (_state < ST_CLOSED && _state != ST_STANDBY)
0699: _state = ST_SESSION_ONLY;
0700: }
0701: }
0702:
0703: /**
0704: * Open a stream to the target server.
0705: *
0706: * @return the socket's read/write pair.
0707: */
0708: public ClusterStream openSoft() {
0709: int state = _state;
0710:
0711: if (!(ST_STARTING <= state && state <= ST_ACTIVE)) {
0712: return null;
0713: }
0714:
0715: ClusterStream stream = openRecycle();
0716:
0717: if (stream != null)
0718: return stream;
0719:
0720: if (canOpenSoft()) {
0721: return connect();
0722: } else {
0723: return null;
0724: }
0725: }
0726:
0727: /**
0728: * Open a stream to the target server object persistence.
0729: *
0730: * @return the socket's read/write pair.
0731: */
0732: public ClusterStream openIfLive() {
0733: if (_state == ST_CLOSED) {
0734: return null;
0735: }
0736:
0737: ClusterStream stream = openRecycle();
0738:
0739: if (stream != null)
0740: return stream;
0741:
0742: long now = Alarm.getCurrentTime();
0743:
0744: if (now < _failTime + _failRecoverTime) {
0745: return null;
0746: }
0747:
0748: return connect();
0749: }
0750:
0751: /**
0752: * Open a stream to the target server for a session.
0753: *
0754: * @return the socket's read/write pair.
0755: */
0756: public ClusterStream openForSession() {
0757: int state = _state;
0758: if (!(ST_SESSION_ONLY <= state && state < ST_CLOSED)) {
0759: return null;
0760: }
0761:
0762: ClusterStream stream = openRecycle();
0763:
0764: if (stream != null)
0765: return stream;
0766:
0767: long now = Alarm.getCurrentTime();
0768:
0769: if (now < _failTime + _failRecoverTime) {
0770: return null;
0771: }
0772:
0773: if (now < _lastBusyTime + _failRecoverTime) {
0774: return null;
0775: }
0776:
0777: return connect();
0778: }
0779:
0780: /**
0781: * Open a stream to the target server for the load balancer.
0782: *
0783: * @return the socket's read/write pair.
0784: */
0785: public ClusterStream open() {
0786: int state = _state;
0787: if (!(ST_STARTING <= state && state < ST_CLOSED))
0788: return null;
0789:
0790: ClusterStream stream = openRecycle();
0791:
0792: if (stream != null)
0793: return stream;
0794:
0795: return connect();
0796: }
0797:
0798: /**
0799: * Returns a valid recycled stream from the idle pool to the backend.
0800: *
0801: * If the stream has been in the pool for too long (> live_time),
0802: * close it instead.
0803: *
0804: * @return the socket's read/write pair.
0805: */
0806: private ClusterStream openRecycle() {
0807: long now = Alarm.getCurrentTime();
0808: ClusterStream stream = null;
0809:
0810: synchronized (this ) {
0811: if (_idleHead != _idleTail) {
0812: stream = _idle[_idleHead];
0813: long freeTime = stream.getFreeTime();
0814:
0815: _idle[_idleHead] = null;
0816: _idleHead = (_idleHead + _idle.length - 1)
0817: % _idle.length;
0818:
0819: if (now < freeTime + _server.getLoadBalanceIdleTime()) {
0820: _activeCount++;
0821: _keepaliveCountTotal++;
0822:
0823: return stream;
0824: }
0825: }
0826: }
0827:
0828: if (stream != null)
0829: stream.closeImpl();
0830:
0831: return null;
0832: }
0833:
0834: /**
0835: * Connect to the backend server.
0836: *
0837: * @return the socket's read/write pair.
0838: */
0839: private ClusterStream connect() {
0840: synchronized (this ) {
0841: if (_maxConnections <= _activeCount + _startingCount)
0842: return null;
0843:
0844: _startingCount++;
0845: }
0846:
0847: try {
0848: ReadWritePair pair = openTCPPair();
0849: ReadStream rs = pair.getReadStream();
0850: rs.setAttribute("timeout", new Integer(
0851: (int) getLoadBalanceSocketTimeout()));
0852:
0853: synchronized (this ) {
0854: _activeCount++;
0855: _connectCountTotal++;
0856: }
0857:
0858: ClusterStream stream = new ClusterStream(_streamCount++,
0859: this , rs, pair.getWriteStream());
0860:
0861: if (log.isLoggable(Level.FINER))
0862: log.finer("connect " + stream);
0863:
0864: if (_firstSuccessTime <= 0) {
0865: if (ST_STARTING <= _state && _state < ST_ACTIVE) {
0866: if (_warmupTime > 0)
0867: _state = ST_WARMUP;
0868: else
0869: _state = ST_ACTIVE;
0870:
0871: _firstSuccessTime = Alarm.getCurrentTime();
0872: }
0873:
0874: if (_warmupState < 0)
0875: _warmupState = 0;
0876: }
0877:
0878: return stream;
0879: } catch (IOException e) {
0880: log.log(Level.FINER, e.toString(), e);
0881:
0882: failConnect();
0883:
0884: return null;
0885: } finally {
0886: synchronized (this ) {
0887: _startingCount--;
0888: }
0889: }
0890: }
0891:
0892: /**
0893: * We now know that the server is live, e.g. if a sibling has
0894: * contacted us.
0895: */
0896: public void wake() {
0897: synchronized (this ) {
0898: if (_state == ST_FAIL) {
0899: _state = ST_STARTING;
0900: }
0901:
0902: _failTime = 0;
0903: }
0904: }
0905:
0906: /**
0907: * Free the read/write pair for reuse. Called only from
0908: * ClusterStream.free()
0909: */
0910: void free(ClusterStream stream) {
0911: synchronized (this ) {
0912: _activeCount--;
0913:
0914: int size = (_idleHead - _idleTail + _idle.length)
0915: % _idle.length;
0916:
0917: if (_state != ST_CLOSED && size < _idleSize) {
0918: _idleHead = (_idleHead + 1) % _idle.length;
0919: _idle[_idleHead] = stream;
0920:
0921: stream = null;
0922: }
0923:
0924: long now = Alarm.getCurrentTime();
0925:
0926: long prevSuccessTime = _prevSuccessTime;
0927:
0928: if (prevSuccessTime > 0) {
0929: _latencyFactor = (0.95 * _latencyFactor + 0.05 * (now - prevSuccessTime));
0930: }
0931:
0932: if (_activeCount > 0)
0933: _prevSuccessTime = now;
0934: else
0935: _prevSuccessTime = 0;
0936:
0937: _lastSuccessTime = now;
0938: }
0939:
0940: updateWarmup();
0941:
0942: long now = Alarm.getCurrentTime();
0943: long maxIdleTime = _server.getLoadBalanceIdleTime();
0944: ClusterStream oldStream = null;
0945:
0946: do {
0947: oldStream = null;
0948:
0949: synchronized (this ) {
0950: if (_idleHead != _idleTail) {
0951: int nextTail = (_idleTail + 1) % _idle.length;
0952:
0953: oldStream = _idle[nextTail];
0954:
0955: if (oldStream != null
0956: && oldStream.getFreeTime() + maxIdleTime < now) {
0957: _idle[nextTail] = null;
0958: _idleTail = nextTail;
0959: } else
0960: oldStream = null;
0961: }
0962: }
0963:
0964: if (oldStream != null)
0965: oldStream.closeImpl();
0966: } while (oldStream != null);
0967:
0968: if (stream != null)
0969: stream.closeImpl();
0970: }
0971:
0972: private void updateWarmup() {
0973: synchronized (this ) {
0974: if (!isEnabled())
0975: return;
0976:
0977: long now = Alarm.getCurrentTime();
0978: int warmupState = _warmupState;
0979:
0980: if (warmupState >= 0 && _firstSuccessTime > 0) {
0981: warmupState = (int) ((now - _firstSuccessTime) / _warmupChunkTime);
0982:
0983: // reset the connection fail recover time
0984: _dynamicFailRecoverTime = 1000L;
0985:
0986: if (WARMUP_MAX <= warmupState) {
0987: warmupState = WARMUP_MAX;
0988: toActive();
0989: }
0990: }
0991:
0992: _warmupState = warmupState;
0993: }
0994: }
0995:
0996: /**
0997: * Closes the read/write pair for reuse. Called only
0998: * from ClusterStream.close().
0999: */
1000: void close(ClusterStream stream) {
1001: if (log.isLoggable(Level.FINER))
1002: log.finer("close " + stream);
1003:
1004: synchronized (this ) {
1005: _activeCount--;
1006: }
1007: }
1008:
1009: /**
1010: * Clears the recycled connections, e.g. on detection of backend
1011: * server going down.
1012: */
1013: public void clearRecycle() {
1014: ArrayList<ClusterStream> recycleList = null;
1015:
1016: synchronized (this ) {
1017: _idleHead = _idleTail = 0;
1018:
1019: for (int i = 0; i < _idle.length; i++) {
1020: ClusterStream stream;
1021:
1022: stream = _idle[i];
1023: _idle[i] = null;
1024:
1025: if (stream != null) {
1026: if (recycleList == null)
1027: recycleList = new ArrayList<ClusterStream>();
1028:
1029: recycleList.add(stream);
1030: }
1031: }
1032: }
1033:
1034: if (recycleList != null) {
1035: for (ClusterStream stream : recycleList) {
1036: stream.closeImpl();
1037: }
1038: }
1039: }
1040:
1041: /**
1042: * Close the client
1043: */
1044: public void close() {
1045: synchronized (this ) {
1046: if (_state == ST_CLOSED)
1047: return;
1048:
1049: _state = ST_CLOSED;
1050: }
1051:
1052: synchronized (this ) {
1053: _idleHead = _idleTail = 0;
1054: }
1055:
1056: for (int i = 0; i < _idle.length; i++) {
1057: ClusterStream stream;
1058:
1059: synchronized (this ) {
1060: stream = _idle[i];
1061: _idle[i] = null;
1062: }
1063:
1064: if (stream != null)
1065: stream.closeImpl();
1066: }
1067: }
1068:
1069: /**
1070: * Open a read/write pair to the target srun connection.
1071: *
1072: * @return the socket's read/write pair.
1073: */
1074: ReadWritePair openTCPPair() throws IOException {
1075: return _tcpPath.openReadWrite();
1076: }
1077:
1078: /**
1079: * Returns true if can connect to the client.
1080: */
1081: public boolean canConnect() {
1082: try {
1083: wake();
1084:
1085: ClusterStream stream = open();
1086:
1087: if (stream != null) {
1088: stream.free();
1089:
1090: return true;
1091: }
1092:
1093: return false;
1094: } catch (Exception e) {
1095: log.log(Level.FINER, e.toString(), e);
1096:
1097: return false;
1098: }
1099: }
1100:
1101: @Override
1102: public String toString() {
1103: return ("ServerConnector[id=" + getId() + " index="
1104: + _port.getIndex() + " address=" + _port.getAddress()
1105: + ":" + _port.getPort() + " cluster="
1106: + _cluster.getId() + "]");
1107: }
1108:
1109: }
|