001: /*
002: * Copyright 1999-2004 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.tomcat.util.net;
018:
019: import java.io.IOException;
020: import java.io.InterruptedIOException;
021: import java.net.BindException;
022: import java.net.InetAddress;
023: import java.net.ServerSocket;
024: import java.net.Socket;
025: import java.net.SocketException;
026: import java.security.AccessControlException;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030: import org.apache.tomcat.util.res.StringManager;
031: import org.apache.tomcat.util.threads.ThreadPool;
032: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
033:
034: /* Similar with MPM module in Apache2.0. Handles all the details related with
035: "tcp server" functionality - thread management, accept policy, etc.
036: It should do nothing more - as soon as it get a socket ( and all socket options
037: are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
038: */
039:
040: /**
041: * Handle incoming TCP connections.
042: *
043: * This class implement a simple server model: one listener thread accepts on a socket and
044: * creates a new worker thread for each incoming connection.
045: *
046: * More advanced Endpoints will reuse the threads, use queues, etc.
047: *
048: * @author James Duncan Davidson [duncan@eng.sun.com]
049: * @author Jason Hunter [jch@eng.sun.com]
050: * @author James Todd [gonzo@eng.sun.com]
051: * @author Costin@eng.sun.com
052: * @author Gal Shachor [shachor@il.ibm.com]
053: * @author Yoav Shapira <yoavs@apache.org>
054: */
055: public class PoolTcpEndpoint { // implements Endpoint {
056:
057: private StringManager sm = StringManager
058: .getManager("org.apache.tomcat.util.net.res");
059:
060: private static final int BACKLOG = 100;
061: private static final int TIMEOUT = 1000;
062:
063: private final Object threadSync = new Object();
064:
065: private boolean isPool = true;
066:
067: private int backlog = BACKLOG;
068: private int serverTimeout = TIMEOUT;
069:
070: TcpConnectionHandler handler;
071:
072: private InetAddress inet;
073: private int port;
074:
075: private ServerSocketFactory factory;
076: private ServerSocket serverSocket;
077:
078: ThreadPoolRunnable listener;
079: private volatile boolean running = false;
080: private volatile boolean paused = false;
081: private boolean initialized = false;
082: private boolean reinitializing = false;
083: static final int debug = 0;
084:
085: ThreadPool tp;
086:
087: static Log log = LogFactory.getLog(PoolTcpEndpoint.class);
088:
089: protected boolean tcpNoDelay = false;
090: protected int linger = 100;
091: protected int socketTimeout = -1;
092:
093: public PoolTcpEndpoint() {
094: tp = new ThreadPool();
095: }
096:
097: public PoolTcpEndpoint(ThreadPool tp) {
098: this .tp = tp;
099: }
100:
101: // -------------------- Configuration --------------------
102:
103: public void setPoolOn(boolean isPool) {
104: this .isPool = isPool;
105: }
106:
107: public boolean isPoolOn() {
108: return isPool;
109: }
110:
111: public void setMaxThreads(int maxThreads) {
112: if (maxThreads > 0)
113: tp.setMaxThreads(maxThreads);
114: }
115:
116: public int getMaxThreads() {
117: return tp.getMaxThreads();
118: }
119:
120: public void setMaxSpareThreads(int maxThreads) {
121: if (maxThreads > 0)
122: tp.setMaxSpareThreads(maxThreads);
123: }
124:
125: public int getMaxSpareThreads() {
126: return tp.getMaxSpareThreads();
127: }
128:
129: public void setMinSpareThreads(int minThreads) {
130: if (minThreads > 0)
131: tp.setMinSpareThreads(minThreads);
132: }
133:
134: public int getMinSpareThreads() {
135: return tp.getMinSpareThreads();
136: }
137:
138: public void setThreadPriority(int threadPriority) {
139: tp.setThreadPriority(threadPriority);
140: }
141:
142: public int getThreadPriority() {
143: return tp.getThreadPriority();
144: }
145:
146: public int getPort() {
147: return port;
148: }
149:
150: public void setPort(int port) {
151: this .port = port;
152: }
153:
154: public InetAddress getAddress() {
155: return inet;
156: }
157:
158: public void setAddress(InetAddress inet) {
159: this .inet = inet;
160: }
161:
162: public void setServerSocket(ServerSocket ss) {
163: serverSocket = ss;
164: }
165:
166: public void setServerSocketFactory(ServerSocketFactory factory) {
167: this .factory = factory;
168: }
169:
170: ServerSocketFactory getServerSocketFactory() {
171: return factory;
172: }
173:
174: public void setConnectionHandler(TcpConnectionHandler handler) {
175: this .handler = handler;
176: }
177:
178: public TcpConnectionHandler getConnectionHandler() {
179: return handler;
180: }
181:
182: public boolean isRunning() {
183: return running;
184: }
185:
186: public boolean isPaused() {
187: return paused;
188: }
189:
190: /**
191: * Allows the server developer to specify the backlog that
192: * should be used for server sockets. By default, this value
193: * is 100.
194: */
195: public void setBacklog(int backlog) {
196: if (backlog > 0)
197: this .backlog = backlog;
198: }
199:
200: public int getBacklog() {
201: return backlog;
202: }
203:
204: /**
205: * Sets the timeout in ms of the server sockets created by this
206: * server. This method allows the developer to make servers
207: * more or less responsive to having their server sockets
208: * shut down.
209: *
210: * <p>By default this value is 1000ms.
211: */
212: public void setServerTimeout(int timeout) {
213: this .serverTimeout = timeout;
214: }
215:
216: public boolean getTcpNoDelay() {
217: return tcpNoDelay;
218: }
219:
220: public void setTcpNoDelay(boolean b) {
221: tcpNoDelay = b;
222: }
223:
224: public int getSoLinger() {
225: return linger;
226: }
227:
228: public void setSoLinger(int i) {
229: linger = i;
230: }
231:
232: public int getSoTimeout() {
233: return socketTimeout;
234: }
235:
236: public void setSoTimeout(int i) {
237: socketTimeout = i;
238: }
239:
240: public int getServerSoTimeout() {
241: return serverTimeout;
242: }
243:
244: public void setServerSoTimeout(int i) {
245: serverTimeout = i;
246: }
247:
248: // -------------------- Public methods --------------------
249:
250: public void initEndpoint() throws IOException,
251: InstantiationException {
252: try {
253: if (factory == null)
254: factory = ServerSocketFactory.getDefault();
255: if (serverSocket == null) {
256: try {
257: if (inet == null) {
258: serverSocket = factory.createSocket(port,
259: backlog);
260: } else {
261: serverSocket = factory.createSocket(port,
262: backlog, inet);
263: }
264: } catch (BindException be) {
265: throw new BindException(be.getMessage() + ":"
266: + port);
267: }
268: }
269: if (serverTimeout >= 0)
270: serverSocket.setSoTimeout(serverTimeout);
271: } catch (IOException ex) {
272: // log("couldn't start endpoint", ex, Logger.DEBUG);
273: throw ex;
274: } catch (InstantiationException ex1) {
275: // log("couldn't start endpoint", ex1, Logger.DEBUG);
276: throw ex1;
277: }
278: initialized = true;
279: }
280:
281: public void startEndpoint() throws IOException,
282: InstantiationException {
283: if (!initialized) {
284: initEndpoint();
285: }
286: if (isPool) {
287: tp.start();
288: }
289: running = true;
290: paused = false;
291: if (isPool) {
292: listener = new TcpWorkerThread(this );
293: tp.runIt(listener);
294: } else {
295: log.error("XXX Error - need pool !");
296: }
297: }
298:
299: public void pauseEndpoint() {
300: if (running && !paused) {
301: paused = true;
302: unlockAccept();
303: }
304: }
305:
306: public void resumeEndpoint() {
307: if (running) {
308: paused = false;
309: }
310: }
311:
312: public void stopEndpoint() {
313: if (running) {
314: tp.shutdown();
315: running = false;
316: if (serverSocket != null) {
317: closeServerSocket();
318: }
319: }
320: }
321:
322: protected void closeServerSocket() {
323: if (!paused)
324: unlockAccept();
325: try {
326: if (serverSocket != null)
327: serverSocket.close();
328: } catch (Exception e) {
329: log.error("Caught exception trying to close socket.", e);
330: }
331: serverSocket = null;
332: }
333:
334: protected void unlockAccept() {
335: Socket s = null;
336: try {
337: // Need to create a connection to unlock the accept();
338: if (inet == null) {
339: s = new Socket("127.0.0.1", port);
340: } else {
341: s = new Socket(inet, port);
342: // setting soLinger to a small value will help shutdown the
343: // connection quicker
344: s.setSoLinger(true, 0);
345: }
346: } catch (Exception e) {
347: log.debug("Caught exception trying to unlock accept on "
348: + port + " " + e.toString());
349: } finally {
350: if (s != null) {
351: try {
352: s.close();
353: } catch (Exception e) {
354: // Ignore
355: }
356: }
357: }
358: }
359:
360: // -------------------- Private methods
361:
362: Socket acceptSocket() {
363: if (!running || serverSocket == null)
364: return null;
365:
366: Socket accepted = null;
367:
368: try {
369: if (factory == null) {
370: accepted = serverSocket.accept();
371: } else {
372: accepted = factory.acceptSocket(serverSocket);
373: }
374: if (null == accepted) {
375: log.warn("Null socket returned by accept");
376: } else {
377: if (!running) {
378: accepted.close(); // rude, but unlikely!
379: accepted = null;
380: } else if (factory != null) {
381: factory.initSocket(accepted);
382: }
383: }
384: } catch (InterruptedIOException iioe) {
385: // normal part -- should happen regularly so
386: // that the endpoint can release if the server
387: // is shutdown.
388: } catch (AccessControlException ace) {
389: // When using the Java SecurityManager this exception
390: // can be thrown if you are restricting access to the
391: // socket with SocketPermission's.
392: // Log the unauthorized access and continue
393: String msg = sm.getString("endpoint.warn.security",
394: serverSocket, ace);
395: log.warn(msg);
396: } catch (IOException e) {
397:
398: String msg = null;
399:
400: if (running) {
401: msg = sm.getString("endpoint.err.nonfatal",
402: serverSocket, e);
403: log.error(msg, e);
404: }
405:
406: if (accepted != null) {
407: try {
408: accepted.close();
409: } catch (Throwable ex) {
410: msg = sm.getString("endpoint.err.nonfatal",
411: accepted, ex);
412: log.warn(msg, ex);
413: }
414: accepted = null;
415: }
416:
417: if (!running)
418: return null;
419: reinitializing = true;
420: // Restart endpoint when getting an IOException during accept
421: synchronized (threadSync) {
422: if (reinitializing) {
423: reinitializing = false;
424: // 1) Attempt to close server socket
425: closeServerSocket();
426: initialized = false;
427: // 2) Reinit endpoint (recreate server socket)
428: try {
429: msg = sm.getString("endpoint.warn.reinit");
430: log.warn(msg);
431: initEndpoint();
432: } catch (Throwable t) {
433: msg = sm.getString("endpoint.err.nonfatal",
434: serverSocket, t);
435: log.error(msg, t);
436: }
437: // 3) If failed, attempt to restart endpoint
438: if (!initialized) {
439: msg = sm.getString("endpoint.warn.restart");
440: log.warn(msg);
441: try {
442: stopEndpoint();
443: initEndpoint();
444: startEndpoint();
445: } catch (Throwable t) {
446: msg = sm.getString("endpoint.err.fatal",
447: serverSocket, t);
448: log.error(msg, t);
449: } finally {
450: // Current thread is now invalid: kill it
451: throw new ThreadDeath();
452: }
453: }
454: }
455: }
456:
457: }
458:
459: return accepted;
460: }
461:
462: /** @deprecated
463: */
464: public void log(String msg) {
465: log.info(msg);
466: }
467:
468: /** @deprecated
469: */
470: public void log(String msg, Throwable t) {
471: log.error(msg, t);
472: }
473:
474: /** @deprecated
475: */
476: public void log(String msg, int level) {
477: log.info(msg);
478: }
479:
480: /** @deprecated
481: */
482: public void log(String msg, Throwable t, int level) {
483: log.error(msg, t);
484: }
485:
486: void setSocketOptions(Socket socket) throws SocketException {
487: if (linger >= 0)
488: socket.setSoLinger(true, linger);
489: if (tcpNoDelay)
490: socket.setTcpNoDelay(tcpNoDelay);
491: if (socketTimeout > 0)
492: socket.setSoTimeout(socketTimeout);
493: }
494:
495: }
496:
497: // -------------------- Threads --------------------
498:
499: /*
500: * I switched the threading model here.
501: *
502: * We used to have a "listener" thread and a "connection"
503: * thread, this results in code simplicity but also a needless
504: * thread switch.
505: *
506: * Instead I am now using a pool of threads, all the threads are
507: * simmetric in their execution and no thread switch is needed.
508: */
509: class TcpWorkerThread implements ThreadPoolRunnable {
510: /* This is not a normal Runnable - it gets attached to an existing
511: thread, runs and when run() ends - the thread keeps running.
512:
513: It's better to keep the name ThreadPoolRunnable - avoid confusion.
514: We also want to use per/thread data and avoid sync wherever possible.
515: */
516: PoolTcpEndpoint endpoint;
517:
518: public TcpWorkerThread(PoolTcpEndpoint endpoint) {
519: this .endpoint = endpoint;
520: }
521:
522: public Object[] getInitData() {
523: // no synchronization overhead, but 2 array access
524: Object obj[] = new Object[2];
525: obj[1] = endpoint.getConnectionHandler().init();
526: obj[0] = new TcpConnection();
527: return obj;
528: }
529:
530: public void runIt(Object perThrData[]) {
531:
532: // Create per-thread cache
533: if (endpoint.isRunning()) {
534:
535: // Loop if endpoint is paused
536: while (endpoint.isPaused()) {
537: try {
538: Thread.sleep(1000);
539: } catch (InterruptedException e) {
540: // Ignore
541: }
542: }
543:
544: // Accept a new connection
545: Socket s = null;
546: try {
547: s = endpoint.acceptSocket();
548: } finally {
549: // Continue accepting on another thread...
550: if (endpoint.isRunning()) {
551: endpoint.tp.runIt(this );
552: }
553: }
554:
555: // Process the connection
556: if (null != s) {
557: TcpConnection con = null;
558: int step = 1;
559: try {
560:
561: // 1: Set socket options: timeout, linger, etc
562: endpoint.setSocketOptions(s);
563:
564: // 2: SSL handshake
565: step = 2;
566: if (endpoint.getServerSocketFactory() != null) {
567: endpoint.getServerSocketFactory().handshake(s);
568: }
569:
570: // 3: Process the connection
571: step = 3;
572: con = (TcpConnection) perThrData[0];
573: con.setEndpoint(endpoint);
574: con.setSocket(s);
575: endpoint.getConnectionHandler().processConnection(
576: con, (Object[]) perThrData[1]);
577:
578: } catch (SocketException se) {
579: PoolTcpEndpoint.log.error("Remote Host "
580: + s.getInetAddress() + " SocketException: "
581: + se.getMessage());
582: // Try to close the socket
583: try {
584: s.close();
585: } catch (IOException e) {
586: }
587: } catch (Throwable t) {
588: if (step == 2) {
589: PoolTcpEndpoint.log
590: .debug("Handshake failed", t);
591: } else {
592: PoolTcpEndpoint.log
593: .error("Unexpected error", t);
594: }
595: // Try to close the socket
596: try {
597: s.close();
598: } catch (IOException e) {
599: }
600: } finally {
601: if (con != null) {
602: con.recycle();
603: }
604: }
605: }
606:
607: }
608: }
609:
610: }
|