001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.util.connection;
019:
020: import java.io.IOException;
021: import java.io.InterruptedIOException;
022: import java.net.ServerSocket;
023: import java.net.Socket;
024: import java.net.SocketException;
025: import java.util.ArrayList;
026: import java.util.Iterator;
027:
028: import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
029: import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
030: import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
031: import org.apache.avalon.excalibur.pool.ObjectFactory;
032: import org.apache.avalon.excalibur.pool.Pool;
033: import org.apache.avalon.excalibur.pool.Poolable;
034: import org.apache.excalibur.thread.ThreadPool;
035: import org.apache.avalon.framework.activity.Initializable;
036: import org.apache.avalon.framework.container.ContainerUtil;
037: import org.apache.avalon.framework.logger.AbstractLogEnabled;
038:
039: /**
040: * Represents a single server socket managed by a connection manager.
041: * The connection manager will spawn a single ServerConnection for each
042: * server socket that the connection manager is managing.
043: *
044: */
045: public class ServerConnection extends AbstractLogEnabled implements
046: Initializable, Runnable {
047:
048: /**
049: * This is a hack to deal with the fact that there appears to be
050: * no platform-independent way to break out of a ServerSocket
051: * accept() call. On some platforms closing either the ServerSocket
052: * itself, or its associated InputStream, causes the accept
053: * method to exit. Unfortunately, this behavior is not consistent
054: * across platforms. The deal with this, we introduce a polling
055: * loop of 20 seconds for the server socket. This introduces a
056: * cost across platforms, but is necessary to maintain cross-platform
057: * functionality.
058: */
059: private static int POLLING_INTERVAL = 20 * 1000;
060:
061: /**
062: * The server socket which this connection is managing
063: */
064: private ServerSocket serverSocket;
065:
066: /**
067: * The connection handler factory that generates connection
068: * handlers to manage client connections to this server socket
069: */
070: private ConnectionHandlerFactory handlerFactory;
071:
072: /**
073: * The pool that produces ClientConnectionRunners
074: */
075: private Pool runnerPool;
076:
077: /**
078: * The factory used to provide ClientConnectionRunner objects
079: */
080: private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
081:
082: /**
083: * The thread pool used to spawn individual threads used to manage each
084: * client connection.
085: */
086: private ThreadPool connThreadPool;
087:
088: /**
089: * The timeout for client sockets spawned off this connection.
090: */
091: private int socketTimeout;
092:
093: /**
094: * The maximum number of open client connections that this server
095: * connection will allow.
096: */
097: private int maxOpenConn;
098:
099: /**
100: * A collection of client connection runners.
101: */
102: private final ArrayList clientConnectionRunners = new ArrayList();
103:
104: /**
105: * The thread used to manage this server connection.
106: */
107: private Thread serverConnectionThread;
108:
109: /**
110: * The sole constructor for a ServerConnection.
111: *
112: * @param serverSocket the ServerSocket associated with this ServerConnection
113: * @param handlerFactory the factory that generates ConnectionHandlers for the client
114: * connections spawned off this ServerConnection
115: * @param threadPool the ThreadPool used to obtain handler threads
116: * @param timeout the client idle timeout for this ServerConnection's client connections
117: * @param maxOpenConn the maximum number of open client connections allowed for this
118: * ServerConnection
119: */
120: public ServerConnection(ServerSocket serverSocket,
121: ConnectionHandlerFactory handlerFactory,
122: ThreadPool threadPool, int timeout, int maxOpenConn) {
123: this .serverSocket = serverSocket;
124: this .handlerFactory = handlerFactory;
125: connThreadPool = threadPool;
126: socketTimeout = timeout;
127: this .maxOpenConn = maxOpenConn;
128: }
129:
130: /**
131: * @see org.apache.avalon.framework.activity.Initializable#initialize()
132: */
133: public void initialize() throws Exception {
134: runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5,
135: maxOpenConn);
136: ContainerUtil.enableLogging(runnerPool, getLogger());
137: ContainerUtil.initialize(runnerPool);
138: }
139:
140: /**
141: * The dispose operation is called by the owning ConnectionManager
142: * at the end of its lifecycle. Cleans up the server connection, forcing
143: * everything to finish.
144: */
145: public void dispose() {
146: if (getLogger().isDebugEnabled()) {
147: getLogger().debug(
148: "Disposing server connection..." + this .toString());
149: }
150: synchronized (this ) {
151: if (null != serverConnectionThread) {
152: // Execution of this block means that the run() method
153: // hasn't finished yet. So we interrupt the thread
154: // to terminate run() and wait for the run() method
155: // to finish. The notifyAll() at the end of run() will
156: // wake this thread and allow dispose() to end.
157: Thread thread = serverConnectionThread;
158: serverConnectionThread = null;
159: thread.interrupt();
160: try {
161: serverSocket.close();
162: } catch (IOException ie) {
163: // Ignored - we're doing this to break out of the
164: // accept. This minimizes the time required to
165: // shutdown the server. Unfortunately, this is
166: // not guaranteed to work on all platforms. See
167: // the comments for POLLING_INTERVAL
168: }
169: try {
170: if (POLLING_INTERVAL > 0) {
171: wait(2L * POLLING_INTERVAL);
172: } else {
173: wait();
174: }
175: } catch (InterruptedException ie) {
176: // Expected - just complete dispose()
177: }
178: }
179: ContainerUtil.dispose(runnerPool);
180: runnerPool = null;
181: }
182:
183: getLogger().debug(
184: "Closed server connection - cleaning up clients - "
185: + this .toString());
186:
187: synchronized (clientConnectionRunners) {
188: Iterator runnerIterator = clientConnectionRunners
189: .iterator();
190: while (runnerIterator.hasNext()) {
191: ClientConnectionRunner runner = (ClientConnectionRunner) runnerIterator
192: .next();
193: runner.dispose();
194: runner = null;
195: }
196: clientConnectionRunners.clear();
197: }
198:
199: getLogger().debug("Cleaned up clients - " + this .toString());
200:
201: }
202:
203: /**
204: * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
205: *
206: * @param clientConnectionRunner the ClientConnectionRunner to be added
207: */
208: private ClientConnectionRunner addClientConnectionRunner()
209: throws Exception {
210: synchronized (clientConnectionRunners) {
211: ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner) runnerPool
212: .get();
213: clientConnectionRunners.add(clientConnectionRunner);
214: if (getLogger().isDebugEnabled()) {
215: getLogger().debug(
216: "Adding one connection for a total of "
217: + clientConnectionRunners.size());
218: }
219: return clientConnectionRunner;
220: }
221: }
222:
223: /**
224: * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
225: *
226: * @param clientConnectionRunner the ClientConnectionRunner to be removed
227: */
228: private void removeClientConnectionRunner(
229: ClientConnectionRunner clientConnectionRunner) {
230:
231: /*
232: * checking runnerPool avoids 'dead-lock' when service is disposing :
233: * (dispose() calls dispose all runners)
234: * but runner is 'running' and cleans up on exit
235: * this situation will result in a dead-lock on 'clientConnectionRunners'
236: */
237: if (runnerPool == null) {
238: getLogger()
239: .info(
240: "ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : "
241: + clientConnectionRunner);
242: return;
243: }
244:
245: synchronized (clientConnectionRunners) {
246: if (clientConnectionRunners.remove(clientConnectionRunner)) {
247: if (getLogger().isDebugEnabled()) {
248: getLogger().debug(
249: "Releasing one connection, leaving a total of "
250: + clientConnectionRunners.size());
251: }
252: runnerPool.put(clientConnectionRunner);
253: }
254: }
255:
256: synchronized (this ) {
257: notify();
258: } // match the wait(...) in the run() inner loop before accept().
259: }
260:
261: /**
262: * Provides the body for the thread of execution for a ServerConnection.
263: * Connections made to the server socket are passed to an appropriate,
264: * newly created, ClientConnectionRunner
265: */
266: public void run() {
267: serverConnectionThread = Thread.currentThread();
268:
269: int ioExceptionCount = 0;
270: try {
271: serverSocket.setSoTimeout(POLLING_INTERVAL);
272: } catch (SocketException se) {
273: // Ignored - for the moment
274: }
275:
276: if ((getLogger().isDebugEnabled())
277: && (serverConnectionThread != null)) {
278: StringBuffer debugBuffer = new StringBuffer(128).append(
279: serverConnectionThread.getName()).append(
280: " is listening on ")
281: .append(serverSocket.toString());
282: getLogger().debug(debugBuffer.toString());
283: }
284: while (!Thread.currentThread().interrupted()
285: && null != serverConnectionThread) {
286: try {
287: Socket clientSocket = null;
288: try {
289: while (maxOpenConn > 0
290: && clientConnectionRunners.size() >= maxOpenConn) {
291: getLogger().warn(
292: "Maximum number of open connections ("
293: + clientConnectionRunners
294: .size() + ") in use.");
295: synchronized (this ) {
296: wait(10000);
297: }
298: }
299:
300: clientSocket = serverSocket.accept();
301:
302: } catch (InterruptedIOException iioe) {
303: // This exception is expected upon ServerConnection shutdown.
304: // See the POLLING_INTERVAL comment
305: continue;
306: } catch (IOException se) {
307: if (ioExceptionCount > 0) {
308: getLogger()
309: .error(
310: "Fatal exception while listening on server socket. Terminating connection.",
311: se);
312: break;
313: } else {
314: continue;
315: }
316: } catch (SecurityException se) {
317: getLogger()
318: .error(
319: "Fatal exception while listening on server socket. Terminating connection.",
320: se);
321: break;
322: }
323: ClientConnectionRunner runner = null;
324: synchronized (clientConnectionRunners) {
325: if ((maxOpenConn > 0)
326: && (clientConnectionRunners.size() >= maxOpenConn)) {
327: if (getLogger().isWarnEnabled()) {
328: getLogger()
329: .warn(
330: "Maximum number of open connections exceeded - refusing connection. Current number of connections is "
331: + clientConnectionRunners
332: .size());
333: if (getLogger().isWarnEnabled()) {
334: Iterator runnerIterator = clientConnectionRunners
335: .iterator();
336: getLogger().info("Connections: ");
337: while (runnerIterator.hasNext()) {
338: getLogger()
339: .info(
340: " "
341: + ((ClientConnectionRunner) runnerIterator
342: .next())
343: .toString());
344: }
345: }
346: }
347: try {
348: clientSocket.close();
349: } catch (IOException ignored) {
350: // We ignore this exception, as we already have an error condition.
351: }
352: continue;
353: } else {
354: clientSocket.setSoTimeout(socketTimeout);
355: runner = addClientConnectionRunner();
356: runner.setSocket(clientSocket);
357: }
358: }
359: setupLogger(runner);
360: try {
361: connThreadPool.execute(runner);
362: } catch (Exception e) {
363: // This error indicates that the underlying thread pool
364: // is out of threads. For robustness, we catch this and
365: // cleanup
366: getLogger()
367: .error(
368: "Internal error - insufficient threads available to service request. "
369: + Thread.activeCount()
370: + " threads in service request pool.",
371: e);
372: try {
373: clientSocket.close();
374: } catch (IOException ignored) {
375: // We ignore this exception, as we already have an error condition.
376: }
377: // In this case, the thread will not remove the client connection runner,
378: // so we must.
379: removeClientConnectionRunner(runner);
380: }
381: } catch (IOException ioe) {
382: getLogger()
383: .error("Exception accepting connection", ioe);
384: } catch (Throwable e) {
385: getLogger().error(
386: "Exception executing client connection runner: "
387: + e.getMessage(), e);
388: }
389: }
390: synchronized (this ) {
391: serverConnectionThread = null;
392: Thread.currentThread().interrupted();
393: notifyAll();
394: }
395: }
396:
397: /**
398: * An inner class to provide the actual body of the thread of execution
399: * that occurs upon a client connection.
400: *
401: */
402: class ClientConnectionRunner extends AbstractLogEnabled implements
403: Poolable, Runnable {
404:
405: /**
406: * The Socket that this client connection is using for transport.
407: */
408: private Socket clientSocket;
409:
410: /**
411: * The thread of execution associated with this client connection.
412: */
413: private Thread clientSocketThread;
414:
415: /**
416: * Returns string for diagnostic logging
417: */
418: public String toString() {
419: return getClass().getName() + " for " + clientSocket
420: + " on " + clientSocketThread;
421: }
422:
423: public ClientConnectionRunner() {
424: }
425:
426: /**
427: * The dispose operation that terminates the runner. Should only be
428: * called by the ServerConnection that owns the ClientConnectionRunner
429: */
430: public void dispose() {
431: synchronized (this ) {
432: if (null != clientSocketThread) {
433: // Execution of this block means that the run() method
434: // hasn't finished yet. So we interrupt the thread
435: // to terminate run() and wait for the run() method
436: // to finish. The notifyAll() at the end of run() will
437: // wake this thread and allow dispose() to end.
438: clientSocketThread.interrupt();
439: clientSocketThread = null;
440: try {
441: wait();
442: } catch (InterruptedException ie) {
443: // Expected - return from the method
444: }
445: }
446: }
447: }
448:
449: /**
450: * Sets the socket for a ClientConnectionRunner.
451: *
452: * @param socket the client socket associated with this ClientConnectionRunner
453: */
454: public void setSocket(Socket socket) {
455: clientSocket = socket;
456: }
457:
458: /**
459: * Provides the body for the thread of execution dealing with a particular client
460: * connection. An appropriate ConnectionHandler is created, applied, executed,
461: * and released.
462: */
463: public void run() {
464: ConnectionHandler handler = null;
465: try {
466: clientSocketThread = Thread.currentThread();
467:
468: handler = ServerConnection.this .handlerFactory
469: .createConnectionHandler();
470: String connectionString = null;
471: if (getLogger().isDebugEnabled()) {
472: connectionString = getConnectionString();
473: String message = "Starting " + connectionString;
474: getLogger().debug(message);
475: }
476:
477: handler.handleConnection(clientSocket);
478:
479: if (getLogger().isDebugEnabled()) {
480: String message = "Ending " + connectionString;
481: getLogger().debug(message);
482: }
483:
484: } catch (Throwable e) {
485: getLogger().error("Error handling connection", e);
486: } finally {
487:
488: // Close the underlying socket
489: try {
490: if (clientSocket != null) {
491: clientSocket.close();
492: }
493: } catch (IOException ioe) {
494: getLogger().warn("Error shutting down connection",
495: ioe);
496: }
497:
498: clientSocket = null;
499:
500: // Null out the thread, notify other threads to encourage
501: // a context switch
502: synchronized (this ) {
503: clientSocketThread = null;
504:
505: Thread.currentThread().interrupted();
506:
507: // Release the handler and kill the reference to the handler factory
508: //
509: // This needs to be done after the clientSocketThread is nulled out,
510: // otherwise we could trash a reused ClientConnectionRunner
511: if (handler != null) {
512: ServerConnection.this .handlerFactory
513: .releaseConnectionHandler(handler);
514: handler = null;
515: }
516:
517: // Remove this runner from the list of active connections.
518: ServerConnection.this
519: .removeClientConnectionRunner(this );
520:
521: notifyAll();
522: }
523: }
524: }
525:
526: /**
527: * Helper method to return a formatted string with connection transport information.
528: *
529: * @return a formatted string
530: */
531: private String getConnectionString() {
532: if (clientSocket == null) {
533: return "invalid socket";
534: }
535: StringBuffer connectionBuffer = new StringBuffer(256)
536: .append("connection on ").append(
537: clientSocket.getLocalAddress()
538: .getHostAddress().toString())
539: .append(":").append(clientSocket.getLocalPort())
540: .append(" from ").append(
541: clientSocket.getInetAddress()
542: .getHostAddress().toString())
543: .append(":").append(clientSocket.getPort());
544: return connectionBuffer.toString();
545: }
546: }
547:
548: /**
549: * The factory for producing handlers.
550: */
551: private class ClientConnectionRunnerFactory implements
552: ObjectFactory {
553:
554: /**
555: * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
556: */
557: public Object newInstance() throws Exception {
558: return new ClientConnectionRunner();
559: }
560:
561: /**
562: * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
563: */
564: public Class getCreatedClass() {
565: return ClientConnectionRunner.class;
566: }
567:
568: /**
569: * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
570: */
571: public void decommission(Object object) throws Exception {
572: return;
573: }
574: }
575: }
|