001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.tomcat.util.net;
019:
020: import java.io.IOException;
021: import java.net.BindException;
022: import java.net.InetAddress;
023: import java.net.ServerSocket;
024: import java.net.Socket;
025: import java.util.concurrent.Executor;
026:
027: import org.apache.juli.logging.Log;
028: import org.apache.juli.logging.LogFactory;
029: import org.apache.tomcat.util.res.StringManager;
030:
031: /**
032: * Handle incoming TCP connections.
033: *
034: * This class implement a simple server model: one listener thread accepts on a socket and
035: * creates a new worker thread for each incoming connection.
036: *
037: * More advanced Endpoints will reuse the threads, use queues, etc.
038: *
039: * @author James Duncan Davidson
040: * @author Jason Hunter
041: * @author James Todd
042: * @author Costin Manolache
043: * @author Gal Shachor
044: * @author Yoav Shapira
045: * @author Remy Maucherat
046: */
047: public class JIoEndpoint {
048:
049: // -------------------------------------------------------------- Constants
050:
051: protected static Log log = LogFactory.getLog(JIoEndpoint.class);
052:
053: protected StringManager sm = StringManager
054: .getManager("org.apache.tomcat.util.net.res");
055:
056: /**
057: * The Request attribute key for the cipher suite.
058: */
059: public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
060:
061: /**
062: * The Request attribute key for the key size.
063: */
064: public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
065:
066: /**
067: * The Request attribute key for the client certificate chain.
068: */
069: public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
070:
071: /**
072: * The Request attribute key for the session id.
073: * This one is a Tomcat extension to the Servlet spec.
074: */
075: public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
076:
077: // ----------------------------------------------------------------- Fields
078:
079: /**
080: * Available workers.
081: */
082: protected WorkerStack workers = null;
083:
084: /**
085: * Running state of the endpoint.
086: */
087: protected volatile boolean running = false;
088:
089: /**
090: * Will be set to true whenever the endpoint is paused.
091: */
092: protected volatile boolean paused = false;
093:
094: /**
095: * Track the initialization state of the endpoint.
096: */
097: protected boolean initialized = false;
098:
099: /**
100: * Current worker threads busy count.
101: */
102: protected int curThreadsBusy = 0;
103:
104: /**
105: * Current worker threads count.
106: */
107: protected int curThreads = 0;
108:
109: /**
110: * Sequence number used to generate thread names.
111: */
112: protected int sequence = 0;
113:
114: /**
115: * Associated server socket.
116: */
117: protected ServerSocket serverSocket = null;
118:
119: // ------------------------------------------------------------- Properties
120:
121: /**
122: * Acceptor thread count.
123: */
124: protected int acceptorThreadCount = 0;
125:
126: public void setAcceptorThreadCount(int acceptorThreadCount) {
127: this .acceptorThreadCount = acceptorThreadCount;
128: }
129:
130: public int getAcceptorThreadCount() {
131: return acceptorThreadCount;
132: }
133:
134: /**
135: * External Executor based thread pool.
136: */
137: protected Executor executor = null;
138:
139: public void setExecutor(Executor executor) {
140: this .executor = executor;
141: }
142:
143: public Executor getExecutor() {
144: return executor;
145: }
146:
147: /**
148: * Maximum amount of worker threads.
149: */
150: protected int maxThreads = 40;
151:
152: public void setMaxThreads(int maxThreads) {
153: this .maxThreads = maxThreads;
154: }
155:
156: public int getMaxThreads() {
157: return maxThreads;
158: }
159:
160: /**
161: * Priority of the acceptor and poller threads.
162: */
163: protected int threadPriority = Thread.NORM_PRIORITY;
164:
165: public void setThreadPriority(int threadPriority) {
166: this .threadPriority = threadPriority;
167: }
168:
169: public int getThreadPriority() {
170: return threadPriority;
171: }
172:
173: /**
174: * Server socket port.
175: */
176: protected int port;
177:
178: public int getPort() {
179: return port;
180: }
181:
182: public void setPort(int port) {
183: this .port = port;
184: }
185:
186: /**
187: * Address for the server socket.
188: */
189: protected InetAddress address;
190:
191: public InetAddress getAddress() {
192: return address;
193: }
194:
195: public void setAddress(InetAddress address) {
196: this .address = address;
197: }
198:
199: /**
200: * Handling of accepted sockets.
201: */
202: protected Handler handler = null;
203:
204: public void setHandler(Handler handler) {
205: this .handler = handler;
206: }
207:
208: public Handler getHandler() {
209: return handler;
210: }
211:
212: /**
213: * Allows the server developer to specify the backlog that
214: * should be used for server sockets. By default, this value
215: * is 100.
216: */
217: protected int backlog = 100;
218:
219: public void setBacklog(int backlog) {
220: if (backlog > 0)
221: this .backlog = backlog;
222: }
223:
224: public int getBacklog() {
225: return backlog;
226: }
227:
228: /**
229: * Socket TCP no delay.
230: */
231: protected boolean tcpNoDelay = false;
232:
233: public boolean getTcpNoDelay() {
234: return tcpNoDelay;
235: }
236:
237: public void setTcpNoDelay(boolean tcpNoDelay) {
238: this .tcpNoDelay = tcpNoDelay;
239: }
240:
241: /**
242: * Socket linger.
243: */
244: protected int soLinger = 100;
245:
246: public int getSoLinger() {
247: return soLinger;
248: }
249:
250: public void setSoLinger(int soLinger) {
251: this .soLinger = soLinger;
252: }
253:
254: /**
255: * Socket timeout.
256: */
257: protected int soTimeout = -1;
258:
259: public int getSoTimeout() {
260: return soTimeout;
261: }
262:
263: public void setSoTimeout(int soTimeout) {
264: this .soTimeout = soTimeout;
265: }
266:
267: /**
268: * The default is true - the created threads will be
269: * in daemon mode. If set to false, the control thread
270: * will not be daemon - and will keep the process alive.
271: */
272: protected boolean daemon = true;
273:
274: public void setDaemon(boolean b) {
275: daemon = b;
276: }
277:
278: public boolean getDaemon() {
279: return daemon;
280: }
281:
282: /**
283: * Name of the thread pool, which will be used for naming child threads.
284: */
285: protected String name = "TP";
286:
287: public void setName(String name) {
288: this .name = name;
289: }
290:
291: public String getName() {
292: return name;
293: }
294:
295: /**
296: * Server socket factory.
297: */
298: protected ServerSocketFactory serverSocketFactory = null;
299:
300: public void setServerSocketFactory(ServerSocketFactory factory) {
301: this .serverSocketFactory = factory;
302: }
303:
304: public ServerSocketFactory getServerSocketFactory() {
305: return serverSocketFactory;
306: }
307:
308: public boolean isRunning() {
309: return running;
310: }
311:
312: public boolean isPaused() {
313: return paused;
314: }
315:
316: public int getCurrentThreadCount() {
317: return curThreads;
318: }
319:
320: public int getCurrentThreadsBusy() {
321: return workers != null ? curThreads - workers.size() : 0;
322: }
323:
324: // ------------------------------------------------ Handler Inner Interface
325:
326: /**
327: * Bare bones interface used for socket processing. Per thread data is to be
328: * stored in the ThreadWithAttributes extra folders, or alternately in
329: * thread local fields.
330: */
331: public interface Handler {
332: public boolean process(Socket socket);
333: }
334:
335: // --------------------------------------------------- Acceptor Inner Class
336:
337: /**
338: * Server socket acceptor thread.
339: */
340: protected class Acceptor implements Runnable {
341:
342: /**
343: * The background thread that listens for incoming TCP/IP connections and
344: * hands them off to an appropriate processor.
345: */
346: public void run() {
347:
348: // Loop until we receive a shutdown command
349: while (running) {
350:
351: // Loop if endpoint is paused
352: while (paused) {
353: try {
354: Thread.sleep(1000);
355: } catch (InterruptedException e) {
356: // Ignore
357: }
358: }
359:
360: // Accept the next incoming connection from the server socket
361: try {
362: Socket socket = serverSocketFactory
363: .acceptSocket(serverSocket);
364: serverSocketFactory.initSocket(socket);
365: // Hand this socket off to an appropriate processor
366: if (!processSocket(socket)) {
367: // Close socket right away
368: try {
369: socket.close();
370: } catch (IOException e) {
371: // Ignore
372: }
373: }
374: } catch (IOException x) {
375: if (running)
376: log.error(sm.getString("endpoint.accept.fail"),
377: x);
378: } catch (Throwable t) {
379: log.error(sm.getString("endpoint.accept.fail"), t);
380: }
381:
382: // The processor will recycle itself when it finishes
383:
384: }
385:
386: }
387:
388: }
389:
390: // ------------------------------------------- SocketProcessor Inner Class
391:
392: /**
393: * This class is the equivalent of the Worker, but will simply use in an
394: * external Executor thread pool.
395: */
396: protected class SocketProcessor implements Runnable {
397:
398: protected Socket socket = null;
399:
400: public SocketProcessor(Socket socket) {
401: this .socket = socket;
402: }
403:
404: public void run() {
405:
406: // Process the request from this socket
407: if (!setSocketOptions(socket) || !handler.process(socket)) {
408: // Close socket
409: try {
410: socket.close();
411: } catch (IOException e) {
412: }
413: }
414:
415: // Finish up this request
416: socket = null;
417:
418: }
419:
420: }
421:
422: // ----------------------------------------------------- Worker Inner Class
423:
424: protected class Worker implements Runnable {
425:
426: protected Thread thread = null;
427: protected boolean available = false;
428: protected Socket socket = null;
429:
430: /**
431: * Process an incoming TCP/IP connection on the specified socket. Any
432: * exception that occurs during processing must be logged and swallowed.
433: * <b>NOTE</b>: This method is called from our Connector's thread. We
434: * must assign it to our own thread so that multiple simultaneous
435: * requests can be handled.
436: *
437: * @param socket TCP socket to process
438: */
439: synchronized void assign(Socket socket) {
440:
441: // Wait for the Processor to get the previous Socket
442: while (available) {
443: try {
444: wait();
445: } catch (InterruptedException e) {
446: }
447: }
448:
449: // Store the newly available Socket and notify our thread
450: this .socket = socket;
451: available = true;
452: notifyAll();
453:
454: }
455:
456: /**
457: * Await a newly assigned Socket from our Connector, or <code>null</code>
458: * if we are supposed to shut down.
459: */
460: private synchronized Socket await() {
461:
462: // Wait for the Connector to provide a new Socket
463: while (!available) {
464: try {
465: wait();
466: } catch (InterruptedException e) {
467: }
468: }
469:
470: // Notify the Connector that we have received this Socket
471: Socket socket = this .socket;
472: available = false;
473: notifyAll();
474:
475: return (socket);
476:
477: }
478:
479: /**
480: * The background thread that listens for incoming TCP/IP connections and
481: * hands them off to an appropriate processor.
482: */
483: public void run() {
484:
485: // Process requests until we receive a shutdown signal
486: while (running) {
487:
488: // Wait for the next socket to be assigned
489: Socket socket = await();
490: if (socket == null)
491: continue;
492:
493: // Process the request from this socket
494: if (!setSocketOptions(socket)
495: || !handler.process(socket)) {
496: // Close socket
497: try {
498: socket.close();
499: } catch (IOException e) {
500: }
501: }
502:
503: // Finish up this request
504: socket = null;
505: recycleWorkerThread(this );
506:
507: }
508:
509: }
510:
511: /**
512: * Start the background processing thread.
513: */
514: public void start() {
515: thread = new Thread(this );
516: thread.setName(getName() + "-" + (++curThreads));
517: thread.setDaemon(true);
518: thread.start();
519: }
520:
521: }
522:
523: // -------------------- Public methods --------------------
524:
525: public void init() throws Exception {
526:
527: if (initialized)
528: return;
529:
530: // Initialize thread count defaults for acceptor
531: if (acceptorThreadCount == 0) {
532: acceptorThreadCount = 1;
533: }
534: if (serverSocketFactory == null) {
535: serverSocketFactory = ServerSocketFactory.getDefault();
536: }
537: if (serverSocket == null) {
538: try {
539: if (address == null) {
540: serverSocket = serverSocketFactory.createSocket(
541: port, backlog);
542: } else {
543: serverSocket = serverSocketFactory.createSocket(
544: port, backlog, address);
545: }
546: } catch (BindException be) {
547: throw new BindException(be.getMessage() + ":" + port);
548: }
549: }
550: //if( serverTimeout >= 0 )
551: // serverSocket.setSoTimeout( serverTimeout );
552:
553: initialized = true;
554:
555: }
556:
557: public void start() throws Exception {
558: // Initialize socket if not done before
559: if (!initialized) {
560: init();
561: }
562: if (!running) {
563: running = true;
564: paused = false;
565:
566: // Create worker collection
567: if (executor == null) {
568: workers = new WorkerStack(maxThreads);
569: }
570:
571: // Start acceptor threads
572: for (int i = 0; i < acceptorThreadCount; i++) {
573: Thread acceptorThread = new Thread(new Acceptor(),
574: getName() + "-Acceptor-" + i);
575: acceptorThread.setPriority(threadPriority);
576: acceptorThread.setDaemon(daemon);
577: acceptorThread.start();
578: }
579: }
580: }
581:
582: public void pause() {
583: if (running && !paused) {
584: paused = true;
585: unlockAccept();
586: }
587: }
588:
589: public void resume() {
590: if (running) {
591: paused = false;
592: }
593: }
594:
595: public void stop() {
596: if (running) {
597: running = false;
598: unlockAccept();
599: }
600: }
601:
602: /**
603: * Deallocate APR memory pools, and close server socket.
604: */
605: public void destroy() throws Exception {
606: if (running) {
607: stop();
608: }
609: if (serverSocket != null) {
610: try {
611: if (serverSocket != null)
612: serverSocket.close();
613: } catch (Exception e) {
614: log.error(sm.getString("endpoint.err.close"), e);
615: }
616: serverSocket = null;
617: }
618: initialized = false;
619: }
620:
621: /**
622: * Unlock the accept by using a local connection.
623: */
624: protected void unlockAccept() {
625: Socket s = null;
626: try {
627: // Need to create a connection to unlock the accept();
628: if (address == null) {
629: s = new Socket("127.0.0.1", port);
630: } else {
631: s = new Socket(address, port);
632: // setting soLinger to a small value will help shutdown the
633: // connection quicker
634: s.setSoLinger(true, 0);
635: }
636: } catch (Exception e) {
637: if (log.isDebugEnabled()) {
638: log.debug(sm.getString("endpoint.debug.unlock", ""
639: + port), e);
640: }
641: } finally {
642: if (s != null) {
643: try {
644: s.close();
645: } catch (Exception e) {
646: // Ignore
647: }
648: }
649: }
650: }
651:
652: /**
653: * Set the options for the current socket.
654: */
655: protected boolean setSocketOptions(Socket socket) {
656: // Process the connection
657: int step = 1;
658: try {
659:
660: // 1: Set socket options: timeout, linger, etc
661: if (soLinger >= 0) {
662: socket.setSoLinger(true, soLinger);
663: }
664: if (tcpNoDelay) {
665: socket.setTcpNoDelay(tcpNoDelay);
666: }
667: if (soTimeout > 0) {
668: socket.setSoTimeout(soTimeout);
669: }
670:
671: // 2: SSL handshake
672: step = 2;
673: serverSocketFactory.handshake(socket);
674:
675: } catch (Throwable t) {
676: if (log.isDebugEnabled()) {
677: if (step == 2) {
678: log
679: .debug(
680: sm
681: .getString("endpoint.err.handshake"),
682: t);
683: } else {
684: log.debug(sm.getString("endpoint.err.unexpected"),
685: t);
686: }
687: }
688: // Tell to close the socket
689: return false;
690: }
691: return true;
692: }
693:
694: /**
695: * Create (or allocate) and return an available processor for use in
696: * processing a specific HTTP request, if possible. If the maximum
697: * allowed processors have already been created and are in use, return
698: * <code>null</code> instead.
699: */
700: protected Worker createWorkerThread() {
701:
702: synchronized (workers) {
703: if (workers.size() > 0) {
704: curThreadsBusy++;
705: return workers.pop();
706: }
707: if ((maxThreads > 0) && (curThreads < maxThreads)) {
708: curThreadsBusy++;
709: return (newWorkerThread());
710: } else {
711: if (maxThreads < 0) {
712: curThreadsBusy++;
713: return (newWorkerThread());
714: } else {
715: return (null);
716: }
717: }
718: }
719:
720: }
721:
722: /**
723: * Create and return a new processor suitable for processing HTTP
724: * requests and returning the corresponding responses.
725: */
726: protected Worker newWorkerThread() {
727:
728: Worker workerThread = new Worker();
729: workerThread.start();
730: return (workerThread);
731:
732: }
733:
734: /**
735: * Return a new worker thread, and block while to worker is available.
736: */
737: protected Worker getWorkerThread() {
738: // Allocate a new worker thread
739: Worker workerThread = createWorkerThread();
740: while (workerThread == null) {
741: try {
742: synchronized (workers) {
743: workers.wait();
744: }
745: } catch (InterruptedException e) {
746: // Ignore
747: }
748: workerThread = createWorkerThread();
749: }
750: return workerThread;
751: }
752:
753: /**
754: * Recycle the specified Processor so that it can be used again.
755: *
756: * @param workerThread The processor to be recycled
757: */
758: protected void recycleWorkerThread(Worker workerThread) {
759: synchronized (workers) {
760: workers.push(workerThread);
761: curThreadsBusy--;
762: workers.notify();
763: }
764: }
765:
766: /**
767: * Process given socket.
768: */
769: protected boolean processSocket(Socket socket) {
770: try {
771: if (executor == null) {
772: getWorkerThread().assign(socket);
773: } else {
774: executor.execute(new SocketProcessor(socket));
775: }
776: } catch (Throwable t) {
777: // This means we got an OOM or similar creating a thread, or that
778: // the pool and its queue are full
779: log.error(sm.getString("endpoint.process.fail"), t);
780: return false;
781: }
782: return true;
783: }
784:
785: // ------------------------------------------------- WorkerStack Inner Class
786:
787: public class WorkerStack {
788:
789: protected Worker[] workers = null;
790: protected int end = 0;
791:
792: public WorkerStack(int size) {
793: workers = new Worker[size];
794: }
795:
796: /**
797: * Put the object into the queue.
798: *
799: * @param object the object to be appended to the queue (first element).
800: */
801: public void push(Worker worker) {
802: workers[end++] = worker;
803: }
804:
805: /**
806: * Get the first object out of the queue. Return null if the queue
807: * is empty.
808: */
809: public Worker pop() {
810: if (end > 0) {
811: return workers[--end];
812: }
813: return null;
814: }
815:
816: /**
817: * Get the first object out of the queue, Return null if the queue
818: * is empty.
819: */
820: public Worker peek() {
821: return workers[end];
822: }
823:
824: /**
825: * Is the queue empty?
826: */
827: public boolean isEmpty() {
828: return (end == 0);
829: }
830:
831: /**
832: * How many elements are there in this queue?
833: */
834: public int size() {
835: return (end);
836: }
837: }
838:
839: }
|