001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.tomcat.util.net;
019:
020: import java.io.IOException;
021: import java.io.InterruptedIOException;
022: import java.net.BindException;
023: import java.net.InetAddress;
024: import java.net.ServerSocket;
025: import java.net.Socket;
026: import java.net.SocketException;
027: import java.security.AccessControlException;
028: import java.util.Stack;
029: import java.util.Vector;
030:
031: import org.apache.juli.logging.Log;
032: import org.apache.juli.logging.LogFactory;
033: import org.apache.tomcat.util.res.StringManager;
034: import org.apache.tomcat.util.threads.ThreadPool;
035: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
036:
037: /* Similar with MPM module in Apache2.0. Handles all the details related with
038: "tcp server" functionality - thread management, accept policy, etc.
039: It should do nothing more - as soon as it get a socket ( and all socket options
040: are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
041: */
042:
043: /**
044: * Handle incoming TCP connections.
045: *
046: * This class implement a simple server model: one listener thread accepts on a socket and
047: * creates a new worker thread for each incoming connection.
048: *
049: * More advanced Endpoints will reuse the threads, use queues, etc.
050: *
051: * @author James Duncan Davidson [duncan@eng.sun.com]
052: * @author Jason Hunter [jch@eng.sun.com]
053: * @author James Todd [gonzo@eng.sun.com]
054: * @author Costin@eng.sun.com
055: * @author Gal Shachor [shachor@il.ibm.com]
056: * @author Yoav Shapira <yoavs@apache.org>
057: */
058: public class PoolTcpEndpoint implements Runnable { // implements Endpoint {
059:
060: static Log log = LogFactory.getLog(PoolTcpEndpoint.class);
061:
062: private StringManager sm = StringManager
063: .getManager("org.apache.tomcat.util.net.res");
064:
065: private static final int BACKLOG = 100;
066: private static final int TIMEOUT = 1000;
067:
068: private final Object threadSync = new Object();
069:
070: private int backlog = BACKLOG;
071: private int serverTimeout = TIMEOUT;
072:
073: private InetAddress inet;
074: private int port;
075:
076: private ServerSocketFactory factory;
077: private ServerSocket serverSocket;
078:
079: private volatile boolean running = false;
080: private volatile boolean paused = false;
081: private boolean initialized = false;
082: private boolean reinitializing = false;
083: static final int debug = 0;
084:
085: protected boolean tcpNoDelay = false;
086: protected int linger = 100;
087: protected int socketTimeout = -1;
088: private boolean lf = true;
089:
090: // ------ Leader follower fields
091:
092: TcpConnectionHandler handler;
093: ThreadPoolRunnable listener;
094: ThreadPool tp;
095:
096: // ------ Master slave fields
097:
098: /* The background thread. */
099: private Thread thread = null;
100: /* Available processors. */
101: private Stack workerThreads = new Stack();
102: private int curThreads = 0;
103: private int maxThreads = 20;
104: /* All processors which have been created. */
105: private Vector created = new Vector();
106:
107: public PoolTcpEndpoint() {
108: tp = new ThreadPool();
109: }
110:
111: public PoolTcpEndpoint(ThreadPool tp) {
112: this .tp = tp;
113: }
114:
115: // -------------------- Configuration --------------------
116:
117: public void setMaxThreads(int maxThreads) {
118: if (maxThreads > 0)
119: tp.setMaxThreads(maxThreads);
120: }
121:
122: public int getMaxThreads() {
123: return tp.getMaxThreads();
124: }
125:
126: public void setMaxSpareThreads(int maxThreads) {
127: if (maxThreads > 0)
128: tp.setMaxSpareThreads(maxThreads);
129: }
130:
131: public int getMaxSpareThreads() {
132: return tp.getMaxSpareThreads();
133: }
134:
135: public void setMinSpareThreads(int minThreads) {
136: if (minThreads > 0)
137: tp.setMinSpareThreads(minThreads);
138: }
139:
140: public int getMinSpareThreads() {
141: return tp.getMinSpareThreads();
142: }
143:
144: public void setThreadPriority(int threadPriority) {
145: tp.setThreadPriority(threadPriority);
146: }
147:
148: public int getThreadPriority() {
149: return tp.getThreadPriority();
150: }
151:
152: public int getPort() {
153: return port;
154: }
155:
156: public void setPort(int port) {
157: this .port = port;
158: }
159:
160: public InetAddress getAddress() {
161: return inet;
162: }
163:
164: public void setAddress(InetAddress inet) {
165: this .inet = inet;
166: }
167:
168: public void setServerSocket(ServerSocket ss) {
169: serverSocket = ss;
170: }
171:
172: public void setServerSocketFactory(ServerSocketFactory factory) {
173: this .factory = factory;
174: }
175:
176: ServerSocketFactory getServerSocketFactory() {
177: return factory;
178: }
179:
180: public void setConnectionHandler(TcpConnectionHandler handler) {
181: this .handler = handler;
182: }
183:
184: public TcpConnectionHandler getConnectionHandler() {
185: return handler;
186: }
187:
188: public boolean isRunning() {
189: return running;
190: }
191:
192: public boolean isPaused() {
193: return paused;
194: }
195:
196: /**
197: * Allows the server developer to specify the backlog that
198: * should be used for server sockets. By default, this value
199: * is 100.
200: */
201: public void setBacklog(int backlog) {
202: if (backlog > 0)
203: this .backlog = backlog;
204: }
205:
206: public int getBacklog() {
207: return backlog;
208: }
209:
210: /**
211: * Sets the timeout in ms of the server sockets created by this
212: * server. This method allows the developer to make servers
213: * more or less responsive to having their server sockets
214: * shut down.
215: *
216: * <p>By default this value is 1000ms.
217: */
218: public void setServerTimeout(int timeout) {
219: this .serverTimeout = timeout;
220: }
221:
222: public boolean getTcpNoDelay() {
223: return tcpNoDelay;
224: }
225:
226: public void setTcpNoDelay(boolean b) {
227: tcpNoDelay = b;
228: }
229:
230: public int getSoLinger() {
231: return linger;
232: }
233:
234: public void setSoLinger(int i) {
235: linger = i;
236: }
237:
238: public int getSoTimeout() {
239: return socketTimeout;
240: }
241:
242: public void setSoTimeout(int i) {
243: socketTimeout = i;
244: }
245:
246: public int getServerSoTimeout() {
247: return serverTimeout;
248: }
249:
250: public void setServerSoTimeout(int i) {
251: serverTimeout = i;
252: }
253:
254: public String getStrategy() {
255: if (lf) {
256: return "lf";
257: } else {
258: return "ms";
259: }
260: }
261:
262: public void setStrategy(String strategy) {
263: if ("ms".equals(strategy)) {
264: lf = false;
265: } else {
266: lf = true;
267: }
268: }
269:
270: public int getCurrentThreadCount() {
271: return curThreads;
272: }
273:
274: public int getCurrentThreadsBusy() {
275: return curThreads - workerThreads.size();
276: }
277:
278: // -------------------- Public methods --------------------
279:
280: public void initEndpoint() throws IOException,
281: InstantiationException {
282: try {
283: if (factory == null)
284: factory = ServerSocketFactory.getDefault();
285: if (serverSocket == null) {
286: try {
287: if (inet == null) {
288: serverSocket = factory.createSocket(port,
289: backlog);
290: } else {
291: serverSocket = factory.createSocket(port,
292: backlog, inet);
293: }
294: } catch (BindException be) {
295: throw new BindException(be.getMessage() + ":"
296: + port);
297: }
298: }
299: if (serverTimeout >= 0)
300: serverSocket.setSoTimeout(serverTimeout);
301: } catch (IOException ex) {
302: throw ex;
303: } catch (InstantiationException ex1) {
304: throw ex1;
305: }
306: initialized = true;
307: }
308:
309: public void startEndpoint() throws IOException,
310: InstantiationException {
311: if (!initialized) {
312: initEndpoint();
313: }
314: if (lf) {
315: tp.start();
316: }
317: running = true;
318: paused = false;
319: if (lf) {
320: listener = new LeaderFollowerWorkerThread(this );
321: tp.runIt(listener);
322: } else {
323: maxThreads = getMaxThreads();
324: threadStart();
325: }
326: }
327:
328: public void pauseEndpoint() {
329: if (running && !paused) {
330: paused = true;
331: unlockAccept();
332: }
333: }
334:
335: public void resumeEndpoint() {
336: if (running) {
337: paused = false;
338: }
339: }
340:
341: public void stopEndpoint() {
342: if (running) {
343: if (lf) {
344: tp.shutdown();
345: }
346: running = false;
347: if (serverSocket != null) {
348: closeServerSocket();
349: }
350: if (!lf) {
351: threadStop();
352: }
353: initialized = false;
354: }
355: }
356:
357: protected void closeServerSocket() {
358: if (!paused)
359: unlockAccept();
360: try {
361: if (serverSocket != null)
362: serverSocket.close();
363: } catch (Exception e) {
364: log.error(sm.getString("endpoint.err.close"), e);
365: }
366: serverSocket = null;
367: }
368:
369: protected void unlockAccept() {
370: Socket s = null;
371: try {
372: // Need to create a connection to unlock the accept();
373: if (inet == null) {
374: s = new Socket("127.0.0.1", port);
375: } else {
376: s = new Socket(inet, port);
377: // setting soLinger to a small value will help shutdown the
378: // connection quicker
379: s.setSoLinger(true, 0);
380: }
381: } catch (Exception e) {
382: if (log.isDebugEnabled()) {
383: log.debug(sm.getString("endpoint.debug.unlock", ""
384: + port), e);
385: }
386: } finally {
387: if (s != null) {
388: try {
389: s.close();
390: } catch (Exception e) {
391: // Ignore
392: }
393: }
394: }
395: }
396:
397: // -------------------- Private methods
398:
399: Socket acceptSocket() {
400: if (!running || serverSocket == null)
401: return null;
402:
403: Socket accepted = null;
404:
405: try {
406: if (factory == null) {
407: accepted = serverSocket.accept();
408: } else {
409: accepted = factory.acceptSocket(serverSocket);
410: }
411: if (null == accepted) {
412: log.warn(sm.getString("endpoint.warn.nullSocket"));
413: } else {
414: if (!running) {
415: accepted.close(); // rude, but unlikely!
416: accepted = null;
417: } else if (factory != null) {
418: factory.initSocket(accepted);
419: }
420: }
421: } catch (InterruptedIOException iioe) {
422: // normal part -- should happen regularly so
423: // that the endpoint can release if the server
424: // is shutdown.
425: } catch (AccessControlException ace) {
426: // When using the Java SecurityManager this exception
427: // can be thrown if you are restricting access to the
428: // socket with SocketPermission's.
429: // Log the unauthorized access and continue
430: String msg = sm.getString("endpoint.warn.security",
431: serverSocket, ace);
432: log.warn(msg);
433: } catch (IOException e) {
434:
435: String msg = null;
436:
437: if (running) {
438: msg = sm.getString("endpoint.err.nonfatal",
439: serverSocket, e);
440: log.error(msg, e);
441: }
442:
443: if (accepted != null) {
444: try {
445: accepted.close();
446: } catch (Throwable ex) {
447: msg = sm.getString("endpoint.err.nonfatal",
448: accepted, ex);
449: log.warn(msg, ex);
450: }
451: accepted = null;
452: }
453:
454: if (!running)
455: return null;
456: reinitializing = true;
457: // Restart endpoint when getting an IOException during accept
458: synchronized (threadSync) {
459: if (reinitializing) {
460: reinitializing = false;
461: // 1) Attempt to close server socket
462: closeServerSocket();
463: initialized = false;
464: // 2) Reinit endpoint (recreate server socket)
465: try {
466: msg = sm.getString("endpoint.warn.reinit");
467: log.warn(msg);
468: initEndpoint();
469: } catch (Throwable t) {
470: msg = sm.getString("endpoint.err.nonfatal",
471: serverSocket, t);
472: log.error(msg, t);
473: }
474: // 3) If failed, attempt to restart endpoint
475: if (!initialized) {
476: msg = sm.getString("endpoint.warn.restart");
477: log.warn(msg);
478: try {
479: stopEndpoint();
480: initEndpoint();
481: startEndpoint();
482: } catch (Throwable t) {
483: msg = sm.getString("endpoint.err.fatal",
484: serverSocket, t);
485: log.error(msg, t);
486: }
487: // Current thread is now invalid: kill it
488: throw new ThreadDeath();
489: }
490: }
491: }
492:
493: }
494:
495: return accepted;
496: }
497:
498: void setSocketOptions(Socket socket) throws SocketException {
499: if (linger >= 0)
500: socket.setSoLinger(true, linger);
501: if (tcpNoDelay)
502: socket.setTcpNoDelay(tcpNoDelay);
503: if (socketTimeout > 0)
504: socket.setSoTimeout(socketTimeout);
505: }
506:
507: void processSocket(Socket s, TcpConnection con, Object[] threadData) {
508: // Process the connection
509: int step = 1;
510: try {
511:
512: // 1: Set socket options: timeout, linger, etc
513: setSocketOptions(s);
514:
515: // 2: SSL handshake
516: step = 2;
517: if (getServerSocketFactory() != null) {
518: getServerSocketFactory().handshake(s);
519: }
520:
521: // 3: Process the connection
522: step = 3;
523: con.setEndpoint(this );
524: con.setSocket(s);
525: getConnectionHandler().processConnection(con, threadData);
526:
527: } catch (SocketException se) {
528: log.debug(sm.getString("endpoint.err.socket", s
529: .getInetAddress()), se);
530: // Try to close the socket
531: try {
532: s.close();
533: } catch (IOException e) {
534: }
535: } catch (Throwable t) {
536: if (step == 2) {
537: if (log.isDebugEnabled()) {
538: log
539: .debug(
540: sm
541: .getString("endpoint.err.handshake"),
542: t);
543: }
544: } else {
545: log.error(sm.getString("endpoint.err.unexpected"), t);
546: }
547: // Try to close the socket
548: try {
549: s.close();
550: } catch (IOException e) {
551: }
552: } finally {
553: if (con != null) {
554: con.recycle();
555: }
556: }
557: }
558:
559: // -------------------------------------------------- Master Slave Methods
560:
561: /**
562: * Create (or allocate) and return an available processor for use in
563: * processing a specific HTTP request, if possible. If the maximum
564: * allowed processors have already been created and are in use, return
565: * <code>null</code> instead.
566: */
567: private MasterSlaveWorkerThread createWorkerThread() {
568:
569: synchronized (workerThreads) {
570: if (workerThreads.size() > 0) {
571: return ((MasterSlaveWorkerThread) workerThreads.pop());
572: }
573: if ((maxThreads > 0) && (curThreads < maxThreads)) {
574: return (newWorkerThread());
575: } else {
576: if (maxThreads < 0) {
577: return (newWorkerThread());
578: } else {
579: return (null);
580: }
581: }
582: }
583:
584: }
585:
586: /**
587: * Create and return a new processor suitable for processing HTTP
588: * requests and returning the corresponding responses.
589: */
590: private MasterSlaveWorkerThread newWorkerThread() {
591:
592: MasterSlaveWorkerThread workerThread = new MasterSlaveWorkerThread(
593: this , tp.getName() + "-" + (++curThreads));
594: workerThread.start();
595: created.addElement(workerThread);
596: return (workerThread);
597:
598: }
599:
600: /**
601: * Recycle the specified Processor so that it can be used again.
602: *
603: * @param processor The processor to be recycled
604: */
605: void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
606: workerThreads.push(workerThread);
607: }
608:
609: /**
610: * The background thread that listens for incoming TCP/IP connections and
611: * hands them off to an appropriate processor.
612: */
613: public void run() {
614:
615: // Loop until we receive a shutdown command
616: while (running) {
617:
618: // Loop if endpoint is paused
619: while (paused) {
620: try {
621: Thread.sleep(1000);
622: } catch (InterruptedException e) {
623: // Ignore
624: }
625: }
626:
627: // Allocate a new worker thread
628: MasterSlaveWorkerThread workerThread = createWorkerThread();
629: if (workerThread == null) {
630: try {
631: // Wait a little for load to go down: as a result,
632: // no accept will be made until the concurrency is
633: // lower than the specified maxThreads, and current
634: // connections will wait for a little bit instead of
635: // failing right away.
636: Thread.sleep(100);
637: } catch (InterruptedException e) {
638: // Ignore
639: }
640: continue;
641: }
642:
643: // Accept the next incoming connection from the server socket
644: Socket socket = acceptSocket();
645:
646: // Hand this socket off to an appropriate processor
647: workerThread.assign(socket);
648:
649: // The processor will recycle itself when it finishes
650:
651: }
652:
653: // Notify the threadStop() method that we have shut ourselves down
654: synchronized (threadSync) {
655: threadSync.notifyAll();
656: }
657:
658: }
659:
660: /**
661: * Start the background processing thread.
662: */
663: private void threadStart() {
664: thread = new Thread(this , tp.getName());
665: thread.setPriority(getThreadPriority());
666: thread.setDaemon(true);
667: thread.start();
668: }
669:
670: /**
671: * Stop the background processing thread.
672: */
673: private void threadStop() {
674: thread = null;
675: }
676:
677: }
|