0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017:
0018: package org.apache.jk.common;
0019:
0020: import java.util.Set;
0021: import java.util.Iterator;
0022: import java.io.IOException;
0023: import java.io.InputStream;
0024: import java.io.OutputStream;
0025: import java.nio.ByteBuffer;
0026: import java.nio.channels.Selector;
0027: import java.nio.channels.SelectionKey;
0028: import java.nio.channels.SocketChannel;
0029: import java.nio.channels.ClosedSelectorException;
0030: import java.nio.channels.ServerSocketChannel;
0031: import java.nio.channels.CancelledKeyException;
0032: import java.nio.channels.ClosedChannelException;
0033: import java.net.URLEncoder;
0034: import java.net.InetAddress;
0035: import java.net.InetSocketAddress;
0036: import java.net.ServerSocket;
0037: import java.net.Socket;
0038: import java.net.SocketException;
0039:
0040: import javax.management.ListenerNotFoundException;
0041: import javax.management.MBeanNotificationInfo;
0042: import javax.management.Notification;
0043: import javax.management.NotificationBroadcaster;
0044: import javax.management.NotificationBroadcasterSupport;
0045: import javax.management.NotificationFilter;
0046: import javax.management.NotificationListener;
0047: import javax.management.ObjectName;
0048:
0049: import org.apache.jk.core.JkHandler;
0050: import org.apache.jk.core.Msg;
0051: import org.apache.jk.core.MsgContext;
0052: import org.apache.jk.core.JkChannel;
0053: import org.apache.jk.core.WorkerEnv;
0054: import org.apache.coyote.Request;
0055: import org.apache.coyote.RequestGroupInfo;
0056: import org.apache.coyote.RequestInfo;
0057: import org.apache.tomcat.util.modeler.Registry;
0058: import org.apache.tomcat.util.threads.ThreadPool;
0059: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
0060:
0061: /**
0062: * Accept ( and send ) TCP messages.
0063: *
0064: * @author Costin Manolache
0065: * @author Bill Barker
0066: * jmx:mbean name="jk:service=ChannelNioSocket"
0067: * description="Accept socket connections"
0068: * jmx:notification name="org.apache.coyote.INVOKE
0069: * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
0070: * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
0071: * jmx:notification-handler name="org.apache.jk.JK_FLUSH
0072: *
0073: * Jk can use multiple protocols/transports.
0074: * Various container adapters should load this object ( as a bean ),
0075: * set configurations and use it. Note that the connector will handle
0076: * all incoming protocols - it's not specific to ajp1x. The protocol
0077: * is abstracted by MsgContext/Message/Channel.
0078: *
0079: * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
0080: * TCP, Ajp14 API etc.
0081: * As we add other protocols/transports/APIs this will change, the current goal
0082: * is to get the same level of functionality as in the original jk connector.
0083: *
0084: * XXX Make the 'message type' pluggable
0085: */
0086: public class ChannelNioSocket extends JkHandler implements
0087: NotificationBroadcaster, JkChannel {
0088: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
0089: .getLog(ChannelNioSocket.class);
0090:
0091: private int startPort = 8009;
0092: private int maxPort = 8019; // 0 for backward compat.
0093: private int port = startPort;
0094: private InetAddress inet;
0095: private int serverTimeout = 0;
0096: private boolean tcpNoDelay = true; // nodelay to true by default
0097: private int linger = 100;
0098: private int socketTimeout = 0;
0099: private boolean nioIsBroken = false;
0100: private Selector selector = null;
0101: private int bufferSize = 8 * 1024;
0102: private int packetSize = 8 * 1024;
0103:
0104: private long requestCount = 0;
0105:
0106: /* Turning this to true will reduce the latency with about 20%.
0107: But it requires changes in tomcat to make sure client-requested
0108: flush() is honored ( on my test, I got 367->433 RPS and
0109: 52->35ms average time with a simple servlet )
0110: */
0111:
0112: ThreadPool tp = ThreadPool.createThreadPool(true);
0113:
0114: /* ==================== Tcp socket options ==================== */
0115:
0116: /**
0117: * jmx:managed-constructor description="default constructor"
0118: */
0119: public ChannelNioSocket() {
0120: // This should be integrated with the domain setup
0121: }
0122:
0123: public ThreadPool getThreadPool() {
0124: return tp;
0125: }
0126:
0127: public long getRequestCount() {
0128: return requestCount;
0129: }
0130:
0131: /** Set the port for the ajp13 channel.
0132: * To support seemless load balancing and jni, we treat this
0133: * as the 'base' port - we'll try up until we find one that is not
0134: * used. We'll also provide the 'difference' to the main coyote
0135: * handler - that will be our 'sessionID' and the position in
0136: * the scoreboard and the suffix for the unix domain socket.
0137: *
0138: * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
0139: */
0140: public void setPort(int port) {
0141: this .startPort = port;
0142: this .port = port;
0143: this .maxPort = port + 10;
0144: }
0145:
0146: public int getPort() {
0147: return port;
0148: }
0149:
0150: public void setAddress(InetAddress inet) {
0151: this .inet = inet;
0152: }
0153:
0154: public void setBufferSize(int bs) {
0155: if (bs > 8 * 1024) {
0156: bufferSize = bs;
0157: }
0158: }
0159:
0160: public int getBufferSize() {
0161: return bufferSize;
0162: }
0163:
0164: public void setPacketSize(int ps) {
0165: if (ps < 8 * 1024) {
0166: ps = 8 * 1024;
0167: }
0168: packetSize = ps;
0169: }
0170:
0171: public int getPacketSize() {
0172: return packetSize;
0173: }
0174:
0175: /**
0176: * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
0177: */
0178: public void setAddress(String inet) {
0179: try {
0180: this .inet = InetAddress.getByName(inet);
0181: } catch (Exception ex) {
0182: log.error("Error parsing " + inet, ex);
0183: }
0184: }
0185:
0186: public String getAddress() {
0187: if (inet != null)
0188: return inet.toString();
0189: return "/0.0.0.0";
0190: }
0191:
0192: /**
0193: * Sets the timeout in ms of the server sockets created by this
0194: * server. This method allows the developer to make servers
0195: * more or less responsive to having their server sockets
0196: * shut down.
0197: *
0198: * <p>By default this value is 1000ms.
0199: */
0200: public void setServerTimeout(int timeout) {
0201: this .serverTimeout = timeout;
0202: }
0203:
0204: public int getServerTimeout() {
0205: return serverTimeout;
0206: }
0207:
0208: public void setTcpNoDelay(boolean b) {
0209: tcpNoDelay = b;
0210: }
0211:
0212: public boolean getTcpNoDelay() {
0213: return tcpNoDelay;
0214: }
0215:
0216: public void setSoLinger(int i) {
0217: linger = i;
0218: }
0219:
0220: public int getSoLinger() {
0221: return linger;
0222: }
0223:
0224: public void setSoTimeout(int i) {
0225: socketTimeout = i;
0226: }
0227:
0228: public int getSoTimeout() {
0229: return socketTimeout;
0230: }
0231:
0232: public void setMaxPort(int i) {
0233: maxPort = i;
0234: }
0235:
0236: public int getMaxPort() {
0237: return maxPort;
0238: }
0239:
0240: /** At startup we'll look for the first free port in the range.
0241: The difference between this port and the beggining of the range
0242: is the 'id'.
0243: This is usefull for lb cases ( less config ).
0244: */
0245: public int getInstanceId() {
0246: return port - startPort;
0247: }
0248:
0249: /** If set to false, the thread pool will be created in
0250: * non-daemon mode, and will prevent main from exiting
0251: */
0252: public void setDaemon(boolean b) {
0253: tp.setDaemon(b);
0254: }
0255:
0256: public boolean getDaemon() {
0257: return tp.getDaemon();
0258: }
0259:
0260: public void setMaxThreads(int i) {
0261: if (log.isDebugEnabled())
0262: log.debug("Setting maxThreads " + i);
0263: tp.setMaxThreads(i);
0264: }
0265:
0266: public void setMinSpareThreads(int i) {
0267: if (log.isDebugEnabled())
0268: log.debug("Setting minSpareThreads " + i);
0269: tp.setMinSpareThreads(i);
0270: }
0271:
0272: public void setMaxSpareThreads(int i) {
0273: if (log.isDebugEnabled())
0274: log.debug("Setting maxSpareThreads " + i);
0275: tp.setMaxSpareThreads(i);
0276: }
0277:
0278: public int getMaxThreads() {
0279: return tp.getMaxThreads();
0280: }
0281:
0282: public int getMinSpareThreads() {
0283: return tp.getMinSpareThreads();
0284: }
0285:
0286: public int getMaxSpareThreads() {
0287: return tp.getMaxSpareThreads();
0288: }
0289:
0290: public void setBacklog(int i) {
0291: }
0292:
0293: public void setNioIsBroken(boolean nib) {
0294: nioIsBroken = nib;
0295: }
0296:
0297: public boolean getNioIsBroken() {
0298: return nioIsBroken;
0299: }
0300:
0301: /* ==================== ==================== */
0302: ServerSocket sSocket;
0303: final int socketNote = 1;
0304: final int isNote = 2;
0305: final int osNote = 3;
0306: final int notifNote = 4;
0307: boolean paused = false;
0308:
0309: public void pause() throws Exception {
0310: synchronized (this ) {
0311: paused = true;
0312: }
0313: }
0314:
0315: public void resume() {
0316: synchronized (this ) {
0317: paused = false;
0318: notify();
0319: }
0320: }
0321:
0322: public void accept(MsgContext ep) throws IOException {
0323: if (sSocket == null)
0324: return;
0325: synchronized (this ) {
0326: while (paused) {
0327: try {
0328: wait();
0329: } catch (InterruptedException ie) {
0330: //Ignore, since can't happen
0331: }
0332: }
0333: }
0334: SocketChannel sc = sSocket.getChannel().accept();
0335: Socket s = sc.socket();
0336: ep.setNote(socketNote, s);
0337: if (log.isDebugEnabled())
0338: log.debug("Accepted socket " + s + " channel "
0339: + sc.isBlocking());
0340:
0341: try {
0342: setSocketOptions(s);
0343: } catch (SocketException sex) {
0344: log.debug("Error initializing Socket Options", sex);
0345: }
0346:
0347: requestCount++;
0348:
0349: sc.configureBlocking(false);
0350: InputStream is = new SocketInputStream(sc);
0351: OutputStream os = new SocketOutputStream(sc);
0352: ep.setNote(isNote, is);
0353: ep.setNote(osNote, os);
0354: ep.setControl(tp);
0355: }
0356:
0357: private void setSocketOptions(Socket s) throws SocketException {
0358: if (socketTimeout > 0)
0359: s.setSoTimeout(socketTimeout);
0360:
0361: s.setTcpNoDelay(tcpNoDelay); // set socket tcpnodelay state
0362:
0363: if (linger > 0)
0364: s.setSoLinger(true, linger);
0365: }
0366:
0367: public void resetCounters() {
0368: requestCount = 0;
0369: }
0370:
0371: /** Called after you change some fields at runtime using jmx.
0372: Experimental for now.
0373: */
0374: public void reinit() throws IOException {
0375: destroy();
0376: init();
0377: }
0378:
0379: /**
0380: * jmx:managed-operation
0381: */
0382: public void init() throws IOException {
0383: // Find a port.
0384: if (startPort == 0) {
0385: port = 0;
0386: if (log.isInfoEnabled())
0387: log.info("JK: ajp13 disabling channelNioSocket");
0388: running = true;
0389: return;
0390: }
0391: if (maxPort < startPort)
0392: maxPort = startPort;
0393: ServerSocketChannel ssc = ServerSocketChannel.open();
0394: ssc.configureBlocking(false);
0395: for (int i = startPort; i <= maxPort; i++) {
0396: try {
0397: InetSocketAddress iddr = null;
0398: if (inet == null) {
0399: iddr = new InetSocketAddress(i);
0400: } else {
0401: iddr = new InetSocketAddress(inet, i);
0402: }
0403: sSocket = ssc.socket();
0404: sSocket.bind(iddr);
0405: port = i;
0406: break;
0407: } catch (IOException ex) {
0408: if (log.isInfoEnabled())
0409: log.info("Port busy " + i + " " + ex.toString());
0410: sSocket = null;
0411: }
0412: }
0413:
0414: if (sSocket == null) {
0415: log.error("Can't find free port " + startPort + " "
0416: + maxPort);
0417: return;
0418: }
0419: if (log.isInfoEnabled())
0420: log.info("JK: ajp13 listening on " + getAddress() + ":"
0421: + port);
0422:
0423: selector = Selector.open();
0424: ssc.register(selector, SelectionKey.OP_ACCEPT);
0425: // If this is not the base port and we are the 'main' channleSocket and
0426: // SHM didn't already set the localId - we'll set the instance id
0427: if ("channelNioSocket".equals(name) && port != startPort
0428: && (wEnv.getLocalId() == 0)) {
0429: wEnv.setLocalId(port - startPort);
0430: }
0431:
0432: // XXX Reverse it -> this is a notification generator !!
0433: if (next == null && wEnv != null) {
0434: if (nextName != null)
0435: setNext(wEnv.getHandler(nextName));
0436: if (next == null)
0437: next = wEnv.getHandler("dispatch");
0438: if (next == null)
0439: next = wEnv.getHandler("request");
0440: }
0441: JMXRequestNote = wEnv.getNoteId(WorkerEnv.ENDPOINT_NOTE,
0442: "requestNote");
0443: running = true;
0444:
0445: // Run a thread that will accept connections.
0446: // XXX Try to find a thread first - not sure how...
0447: if (this .domain != null) {
0448: try {
0449: tpOName = new ObjectName(domain
0450: + ":type=ThreadPool,name=" + getChannelName());
0451:
0452: Registry.getRegistry(null, null).registerComponent(tp,
0453: tpOName, null);
0454:
0455: rgOName = new ObjectName(domain
0456: + ":type=GlobalRequestProcessor,name="
0457: + getChannelName());
0458: Registry.getRegistry(null, null).registerComponent(
0459: global, rgOName, null);
0460: } catch (Exception e) {
0461: log.error("Can't register threadpool");
0462: }
0463: }
0464:
0465: tp.start();
0466: Poller pollAjp = new Poller();
0467: tp.runIt(pollAjp);
0468: }
0469:
0470: ObjectName tpOName;
0471: ObjectName rgOName;
0472: RequestGroupInfo global = new RequestGroupInfo();
0473: int JMXRequestNote;
0474:
0475: public void start() throws IOException {
0476: if (sSocket == null)
0477: init();
0478: resume();
0479: }
0480:
0481: public void stop() throws IOException {
0482: destroy();
0483: }
0484:
0485: public void registerRequest(Request req, MsgContext ep, int count) {
0486: if (this .domain != null) {
0487: try {
0488: RequestInfo rp = req.getRequestProcessor();
0489: rp.setGlobalProcessor(global);
0490: ObjectName roname = new ObjectName(getDomain()
0491: + ":type=RequestProcessor,worker="
0492: + getChannelName() + ",name=JkRequest" + count);
0493: ep.setNote(JMXRequestNote, roname);
0494:
0495: Registry.getRegistry(null, null).registerComponent(rp,
0496: roname, null);
0497: } catch (Exception ex) {
0498: log.warn("Error registering request");
0499: }
0500: }
0501: }
0502:
0503: public void open(MsgContext ep) throws IOException {
0504: }
0505:
0506: public void close(MsgContext ep) throws IOException {
0507: Socket s = (Socket) ep.getNote(socketNote);
0508: SelectionKey key = s.getChannel().keyFor(selector);
0509: if (key != null) {
0510: key.cancel();
0511: }
0512: s.close();
0513: }
0514:
0515: public void destroy() throws IOException {
0516: running = false;
0517: try {
0518: /* If we disabled the channel return */
0519: if (port == 0)
0520: return;
0521: tp.shutdown();
0522:
0523: selector.wakeup().close();
0524: sSocket.close(); // XXX?
0525:
0526: if (tpOName != null) {
0527: Registry.getRegistry(null, null).unregisterComponent(
0528: tpOName);
0529: }
0530: if (rgOName != null) {
0531: Registry.getRegistry(null, null).unregisterComponent(
0532: rgOName);
0533: }
0534: } catch (Exception e) {
0535: log.info("Error shutting down the channel " + port + " "
0536: + e.toString());
0537: if (log.isDebugEnabled())
0538: log.debug("Trace", e);
0539: }
0540: }
0541:
0542: public int send(Msg msg, MsgContext ep) throws IOException {
0543: msg.end(); // Write the packet header
0544: byte buf[] = msg.getBuffer();
0545: int len = msg.getLen();
0546:
0547: if (log.isTraceEnabled())
0548: log.trace("send() " + len + " " + buf[4]);
0549:
0550: OutputStream os = (OutputStream) ep.getNote(osNote);
0551: os.write(buf, 0, len);
0552: return len;
0553: }
0554:
0555: public int flush(Msg msg, MsgContext ep) throws IOException {
0556: OutputStream os = (OutputStream) ep.getNote(osNote);
0557: os.flush();
0558: return 0;
0559: }
0560:
0561: public int receive(Msg msg, MsgContext ep) throws IOException {
0562: if (log.isTraceEnabled()) {
0563: log.trace("receive() ");
0564: }
0565:
0566: byte buf[] = msg.getBuffer();
0567: int hlen = msg.getHeaderLength();
0568:
0569: // XXX If the length in the packet header doesn't agree with the
0570: // actual number of bytes read, it should probably return an error
0571: // value. Also, callers of this method never use the length
0572: // returned -- should probably return true/false instead.
0573:
0574: int rd = this .read(ep, buf, 0, hlen);
0575:
0576: if (rd < 0) {
0577: // Most likely normal apache restart.
0578: // log.warn("Wrong message " + rd );
0579: return rd;
0580: }
0581:
0582: msg.processHeader();
0583:
0584: /* After processing the header we know the body
0585: length
0586: */
0587: int blen = msg.getLen();
0588:
0589: // XXX check if enough space - it's assert()-ed !!!
0590:
0591: int total_read = 0;
0592:
0593: total_read = this .read(ep, buf, hlen, blen);
0594:
0595: if ((total_read <= 0) && (blen > 0)) {
0596: log.warn("can't read body, waited #" + blen);
0597: return -1;
0598: }
0599:
0600: if (total_read != blen) {
0601: log.warn("incomplete read, waited #" + blen + " got only "
0602: + total_read);
0603: return -2;
0604: }
0605:
0606: return total_read;
0607: }
0608:
0609: /**
0610: * Read N bytes from the InputStream, and ensure we got them all
0611: * Under heavy load we could experience many fragmented packets
0612: * just read Unix Network Programming to recall that a call to
0613: * read didn't ensure you got all the data you want
0614: *
0615: * from read() Linux manual
0616: *
0617: * On success, the number of bytes read is returned (zero indicates end
0618: * of file),and the file position is advanced by this number.
0619: * It is not an error if this number is smaller than the number of bytes
0620: * requested; this may happen for example because fewer bytes
0621: * are actually available right now (maybe because we were close to
0622: * end-of-file, or because we are reading from a pipe, or from a
0623: * terminal), or because read() was interrupted by a signal.
0624: * On error, -1 is returned, and errno is set appropriately. In this
0625: * case it is left unspecified whether the file position (if any) changes.
0626: *
0627: **/
0628: public int read(MsgContext ep, byte[] b, int offset, int len)
0629: throws IOException {
0630: InputStream is = (InputStream) ep.getNote(isNote);
0631: int pos = 0;
0632: int got;
0633:
0634: while (pos < len) {
0635: try {
0636: got = is.read(b, pos + offset, len - pos);
0637: } catch (ClosedChannelException sex) {
0638: if (pos > 0) {
0639: log.info("Error reading data after " + pos
0640: + "bytes", sex);
0641: } else {
0642: log.debug("Error reading data", sex);
0643: }
0644: got = -1;
0645: }
0646: if (log.isTraceEnabled()) {
0647: log.trace("read() " + b + " "
0648: + (b == null ? 0 : b.length) + " " + offset
0649: + " " + len + " = " + got);
0650: }
0651:
0652: // connection just closed by remote.
0653: if (got <= 0) {
0654: // This happens periodically, as apache restarts
0655: // periodically.
0656: // It should be more gracefull ! - another feature for Ajp14
0657: // log.warn( "server has closed the current connection (-1)" );
0658: return -3;
0659: }
0660:
0661: pos += got;
0662: }
0663: return pos;
0664: }
0665:
0666: protected boolean running = true;
0667:
0668: /** Accept incoming connections, dispatch to the thread pool
0669: */
0670: void acceptConnections() {
0671: if (running) {
0672: try {
0673: MsgContext ep = createMsgContext();
0674: ep.setSource(this );
0675: ep.setWorkerEnv(wEnv);
0676: this .accept(ep);
0677:
0678: if (!running)
0679: return;
0680:
0681: // Since this is a long-running connection, we don't care
0682: // about the small GC
0683: SocketConnection ajpConn = new SocketConnection(ep);
0684: ajpConn.register(ep);
0685: } catch (Exception ex) {
0686: if (running)
0687: log.warn("Exception executing accept", ex);
0688: }
0689: }
0690: }
0691:
0692: // XXX This should become handleNotification
0693: public int invoke(Msg msg, MsgContext ep) throws IOException {
0694: int type = ep.getType();
0695:
0696: switch (type) {
0697: case JkHandler.HANDLE_RECEIVE_PACKET:
0698: if (log.isDebugEnabled())
0699: log.debug("RECEIVE_PACKET ?? ");
0700: return receive(msg, ep);
0701: case JkHandler.HANDLE_SEND_PACKET:
0702: return send(msg, ep);
0703: case JkHandler.HANDLE_FLUSH:
0704: return flush(msg, ep);
0705: }
0706:
0707: if (log.isTraceEnabled())
0708: log.trace("Call next " + type + " " + next);
0709:
0710: // Send notification
0711: if (nSupport != null) {
0712: Notification notif = (Notification) ep.getNote(notifNote);
0713: if (notif == null) {
0714: notif = new Notification("channelNioSocket.message",
0715: ep, requestCount);
0716: ep.setNote(notifNote, notif);
0717: }
0718: nSupport.sendNotification(notif);
0719: }
0720:
0721: if (next != null) {
0722: return next.invoke(msg, ep);
0723: } else {
0724: log.info("No next ");
0725: }
0726:
0727: return OK;
0728: }
0729:
0730: public boolean isSameAddress(MsgContext ep) {
0731: Socket s = (Socket) ep.getNote(socketNote);
0732: return isSameAddress(s.getLocalAddress(), s.getInetAddress());
0733: }
0734:
0735: public String getChannelName() {
0736: String encodedAddr = "";
0737: if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
0738: encodedAddr = getAddress();
0739: if (encodedAddr.startsWith("/"))
0740: encodedAddr = encodedAddr.substring(1);
0741: encodedAddr = URLEncoder.encode(encodedAddr) + "-";
0742: }
0743: return ("jk-" + encodedAddr + port);
0744: }
0745:
0746: /**
0747: * Return <code>true</code> if the specified client and server addresses
0748: * are the same. This method works around a bug in the IBM 1.1.8 JVM on
0749: * Linux, where the address bytes are returned reversed in some
0750: * circumstances.
0751: *
0752: * @param server The server's InetAddress
0753: * @param client The client's InetAddress
0754: */
0755: public static boolean isSameAddress(InetAddress server,
0756: InetAddress client) {
0757: // Compare the byte array versions of the two addresses
0758: byte serverAddr[] = server.getAddress();
0759: byte clientAddr[] = client.getAddress();
0760: if (serverAddr.length != clientAddr.length)
0761: return (false);
0762: boolean match = true;
0763: for (int i = 0; i < serverAddr.length; i++) {
0764: if (serverAddr[i] != clientAddr[i]) {
0765: match = false;
0766: break;
0767: }
0768: }
0769: if (match)
0770: return (true);
0771:
0772: // Compare the reversed form of the two addresses
0773: for (int i = 0; i < serverAddr.length; i++) {
0774: if (serverAddr[i] != clientAddr[(serverAddr.length - 1) - i])
0775: return (false);
0776: }
0777: return (true);
0778: }
0779:
0780: public void sendNewMessageNotification(Notification notification) {
0781: if (nSupport != null)
0782: nSupport.sendNotification(notification);
0783: }
0784:
0785: private NotificationBroadcasterSupport nSupport = null;
0786:
0787: public void addNotificationListener(NotificationListener listener,
0788: NotificationFilter filter, Object handback)
0789: throws IllegalArgumentException {
0790: if (nSupport == null)
0791: nSupport = new NotificationBroadcasterSupport();
0792: nSupport.addNotificationListener(listener, filter, handback);
0793: }
0794:
0795: public void removeNotificationListener(NotificationListener listener)
0796: throws ListenerNotFoundException {
0797: if (nSupport != null)
0798: nSupport.removeNotificationListener(listener);
0799: }
0800:
0801: MBeanNotificationInfo notifInfo[] = new MBeanNotificationInfo[0];
0802:
0803: public void setNotificationInfo(MBeanNotificationInfo info[]) {
0804: this .notifInfo = info;
0805: }
0806:
0807: public MBeanNotificationInfo[] getNotificationInfo() {
0808: return notifInfo;
0809: }
0810:
0811: protected class SocketConnection implements ThreadPoolRunnable {
0812: MsgContext ep;
0813: MsgAjp recv = new MsgAjp(packetSize);
0814: boolean inProgress = false;
0815:
0816: SocketConnection(MsgContext ep) {
0817: this .ep = ep;
0818: }
0819:
0820: public Object[] getInitData() {
0821: return null;
0822: }
0823:
0824: public void runIt(Object perTh[]) {
0825: if (!processConnection(ep)) {
0826: unregister(ep);
0827: }
0828: }
0829:
0830: public boolean isRunning() {
0831: return inProgress;
0832: }
0833:
0834: public void setFinished() {
0835: inProgress = false;
0836: }
0837:
0838: /** Process a single ajp connection.
0839: */
0840: boolean processConnection(MsgContext ep) {
0841: try {
0842: InputStream sis = (InputStream) ep.getNote(isNote);
0843: boolean haveInput = true;
0844: while (haveInput) {
0845: if (!running || paused) {
0846: return false;
0847: }
0848: int status = receive(recv, ep);
0849: if (status <= 0) {
0850: if (status == -3)
0851: log
0852: .debug("server has been restarted or reset this connection");
0853: else
0854: log
0855: .warn("Closing ajp connection "
0856: + status);
0857: return false;
0858: }
0859: ep.setLong(MsgContext.TIMER_RECEIVED, System
0860: .currentTimeMillis());
0861:
0862: ep.setType(0);
0863: // Will call next
0864: status = invoke(recv, ep);
0865: if (status != JkHandler.OK) {
0866: log.warn("processCallbacks status " + status);
0867: return false;
0868: }
0869: synchronized (this ) {
0870: synchronized (sis) {
0871: haveInput = sis.available() > 0;
0872: }
0873: if (!haveInput) {
0874: setFinished();
0875: } else {
0876: if (log.isDebugEnabled())
0877: log.debug("KeepAlive: "
0878: + sis.available());
0879: }
0880: }
0881: }
0882: } catch (Exception ex) {
0883: String msg = ex.getMessage();
0884: if (msg != null && msg.indexOf("Connection reset") >= 0)
0885: log
0886: .debug("Server has been restarted or reset this connection");
0887: else if (msg != null
0888: && msg.indexOf("Read timed out") >= 0)
0889: log.debug("connection timeout reached");
0890: else
0891: log.error("Error, processing connection", ex);
0892: return false;
0893: }
0894: return true;
0895: }
0896:
0897: synchronized void process(SelectionKey sk) {
0898: if (!sk.isValid()) {
0899: return;
0900: }
0901: if (sk.isReadable()) {
0902: SocketInputStream sis = (SocketInputStream) ep
0903: .getNote(isNote);
0904: boolean isok = sis.readAvailable();
0905: if (!inProgress) {
0906: if (isok) {
0907: if (sis.available() > 0 || !nioIsBroken) {
0908: inProgress = true;
0909: tp.runIt(this );
0910: }
0911: } else {
0912: unregister(ep);
0913: return;
0914: }
0915: }
0916: }
0917: if (sk.isWritable()) {
0918: Object os = ep.getNote(osNote);
0919: synchronized (os) {
0920: os.notify();
0921: }
0922: }
0923: }
0924:
0925: synchronized void unregister(MsgContext ep) {
0926: try {
0927: close(ep);
0928: } catch (Exception e) {
0929: log.error("Error closing connection", e);
0930: }
0931: try {
0932: Request req = (Request) ep.getRequest();
0933: if (req != null) {
0934: ObjectName roname = (ObjectName) ep
0935: .getNote(JMXRequestNote);
0936: if (roname != null) {
0937: Registry.getRegistry(null, null)
0938: .unregisterComponent(roname);
0939: }
0940: req.getRequestProcessor().setGlobalProcessor(null);
0941: }
0942: } catch (Exception ee) {
0943: log.error("Error, releasing connection", ee);
0944: }
0945: }
0946:
0947: void register(MsgContext ep) {
0948: Socket s = (Socket) ep.getNote(socketNote);
0949: try {
0950: s.getChannel().register(selector, SelectionKey.OP_READ,
0951: this );
0952: } catch (IOException iex) {
0953: log.error("Unable to register connection", iex);
0954: unregister(ep);
0955: }
0956: }
0957:
0958: }
0959:
0960: protected class Poller implements ThreadPoolRunnable {
0961:
0962: Poller() {
0963: }
0964:
0965: public Object[] getInitData() {
0966: return null;
0967: }
0968:
0969: public void runIt(Object perTh[]) {
0970: while (running) {
0971: try {
0972: int ns = selector.select(serverTimeout);
0973: if (log.isDebugEnabled())
0974: log.debug("Selecting " + ns + " channels");
0975: if (ns > 0) {
0976: Set sels = selector.selectedKeys();
0977: Iterator it = sels.iterator();
0978: while (it.hasNext()) {
0979: SelectionKey sk = (SelectionKey) it.next();
0980: if (sk.isValid()) {
0981: if (sk.isAcceptable()) {
0982: acceptConnections();
0983: } else {
0984: SocketConnection sc = (SocketConnection) sk
0985: .attachment();
0986: sc.process(sk);
0987: }
0988: } else {
0989: sk.cancel();
0990: }
0991: it.remove();
0992: }
0993: }
0994: } catch (ClosedSelectorException cse) {
0995: log.debug("Selector is closed");
0996: return;
0997: } catch (CancelledKeyException cke) {
0998: log.debug("Key Cancelled", cke);
0999: } catch (IOException iex) {
1000: log.warn("IO Error in select", iex);
1001: } catch (Exception ex) {
1002: log.warn("Error processing select", ex);
1003: }
1004: }
1005: }
1006: }
1007:
1008: protected class SocketInputStream extends InputStream {
1009: final int BUFFER_SIZE = 8200;
1010: private ByteBuffer buffer = ByteBuffer
1011: .allocateDirect(BUFFER_SIZE);
1012: private SocketChannel channel;
1013: private boolean blocking = false;
1014: private boolean isClosed = false;
1015: private volatile boolean dataAvailable = false;
1016:
1017: SocketInputStream(SocketChannel channel) {
1018: this .channel = channel;
1019: buffer.limit(0);
1020: }
1021:
1022: public int available() {
1023: return buffer.remaining();
1024: }
1025:
1026: public void mark(int readlimit) {
1027: buffer.mark();
1028: }
1029:
1030: public boolean markSupported() {
1031: return true;
1032: }
1033:
1034: public void reset() {
1035: buffer.reset();
1036: }
1037:
1038: public synchronized int read() throws IOException {
1039: if (!checkAvailable(1)) {
1040: block(1);
1041: }
1042: return buffer.get();
1043: }
1044:
1045: private boolean checkAvailable(int nbyte) throws IOException {
1046: if (isClosed) {
1047: throw new ClosedChannelException();
1048: }
1049: return buffer.remaining() >= nbyte;
1050: }
1051:
1052: private int fill(int nbyte) throws IOException {
1053: int rem = nbyte;
1054: int read = 0;
1055: boolean eof = false;
1056: byte[] oldData = null;
1057: if (buffer.remaining() > 0) {
1058: // should rarely happen, so short-lived GC shouldn't hurt
1059: // as much as allocating a long-lived buffer for this
1060: if (log.isDebugEnabled())
1061: log.debug("Saving old buffer: "
1062: + buffer.remaining());
1063: oldData = new byte[buffer.remaining()];
1064: buffer.get(oldData);
1065: }
1066: buffer.clear();
1067: if (oldData != null) {
1068: buffer.put(oldData);
1069: }
1070: while (rem > 0) {
1071: int count = channel.read(buffer);
1072: if (count < 0) {
1073: eof = true;
1074: break;
1075: } else if (count == 0) {
1076: log.debug("Failed to recieve signaled read: ");
1077: break;
1078: }
1079: read += count;
1080: rem -= count;
1081: }
1082: buffer.flip();
1083: return eof ? -1 : read;
1084: }
1085:
1086: synchronized boolean readAvailable() {
1087: if (blocking) {
1088: dataAvailable = true;
1089: notify();
1090: } else if (dataAvailable) {
1091: log.debug("Race Condition");
1092: } else {
1093: int nr = 0;
1094:
1095: try {
1096: nr = fill(1);
1097: } catch (ClosedChannelException cce) {
1098: log.debug("Channel is closed", cce);
1099: nr = -1;
1100: } catch (IOException iex) {
1101: log.warn("Exception processing read", iex);
1102: nr = -1; // Can't handle this yet
1103: }
1104: if (nr < 0) {
1105: isClosed = true;
1106: notify();
1107: return false;
1108: } else if (nr == 0) {
1109: if (!nioIsBroken) {
1110: dataAvailable = (buffer.remaining() <= 0);
1111: }
1112: }
1113: }
1114: return true;
1115: }
1116:
1117: public int read(byte[] data) throws IOException {
1118: return read(data, 0, data.length);
1119: }
1120:
1121: public synchronized int read(byte[] data, int offset, int len)
1122: throws IOException {
1123: int olen = len;
1124: while (!checkAvailable(len)) {
1125: int avail = buffer.remaining();
1126: if (avail > 0) {
1127: buffer.get(data, offset, avail);
1128: }
1129: len -= avail;
1130: offset += avail;
1131: block(len);
1132: }
1133: buffer.get(data, offset, len);
1134: return olen;
1135: }
1136:
1137: private void block(int len) throws IOException {
1138: if (len <= 0) {
1139: return;
1140: }
1141: if (!dataAvailable) {
1142: blocking = true;
1143: if (log.isDebugEnabled())
1144: log.debug("Waiting for " + len
1145: + " bytes to be available");
1146: try {
1147: wait(socketTimeout);
1148: } catch (InterruptedException iex) {
1149: log.debug("Interrupted", iex);
1150: }
1151: blocking = false;
1152: }
1153: if (dataAvailable) {
1154: dataAvailable = false;
1155: if (fill(len) < 0) {
1156: isClosed = true;
1157: }
1158: }
1159: }
1160: }
1161:
1162: protected class SocketOutputStream extends OutputStream {
1163: ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
1164: SocketChannel channel;
1165:
1166: SocketOutputStream(SocketChannel channel) {
1167: this .channel = channel;
1168: }
1169:
1170: public void write(int b) throws IOException {
1171: if (!checkAvailable(1)) {
1172: flush();
1173: }
1174: buffer.put((byte) b);
1175: }
1176:
1177: public void write(byte[] data) throws IOException {
1178: write(data, 0, data.length);
1179: }
1180:
1181: public void write(byte[] data, int offset, int len)
1182: throws IOException {
1183: if (!checkAvailable(len)) {
1184: flush();
1185: }
1186: buffer.put(data, offset, len);
1187: }
1188:
1189: public void flush() throws IOException {
1190: buffer.flip();
1191: while (buffer.hasRemaining()) {
1192: int count = channel.write(buffer);
1193: if (count == 0) {
1194: synchronized (this ) {
1195: SelectionKey key = channel.keyFor(selector);
1196: key.interestOps(SelectionKey.OP_WRITE);
1197: if (log.isDebugEnabled())
1198: log.debug("Blocking for channel write: "
1199: + buffer.remaining());
1200: try {
1201: wait();
1202: } catch (InterruptedException iex) {
1203: // ignore, since can't happen
1204: }
1205: key.interestOps(SelectionKey.OP_READ);
1206: }
1207: }
1208: }
1209: buffer.clear();
1210: }
1211:
1212: private boolean checkAvailable(int len) {
1213: return buffer.remaining() >= len;
1214: }
1215: }
1216:
1217: }
|