001: /**************************************************************************************
002: * Copyright (c) Jonas BonŽr, Alexandre Vasseur. All rights reserved. *
003: * http://aspectwerkz.codehaus.org *
004: * ---------------------------------------------------------------------------------- *
005: * The software in this package is published under the terms of the LGPL license *
006: * a copy of which has been included with this distribution in the license.txt file. *
007: **************************************************************************************/package org.codehaus.aspectwerkz.connectivity;
008:
009: import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
010: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
011: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
012: import org.codehaus.aspectwerkz.exception.WrappedRuntimeException;
013:
014: import java.io.FileInputStream;
015: import java.io.IOException;
016: import java.net.InetAddress;
017: import java.net.ServerSocket;
018: import java.net.Socket;
019: import java.util.Properties;
020:
021: /**
022: * Server that listens to a specified port for client requests. <p/>The implementation is based on sockets. <p/>The
023: * invoker spawns a specified number of listener threads in which each one of these spawns a new RemoteProxyServerThread
024: * for each client request that comes in. <p/>Uses a thread pool from util.concurrent.
025: *
026: * @author <a href="mailto:jboner@codehaus.org">Jonas BonŽr </a>
027: */
028: public class RemoteProxyServer implements Runnable {
029: private static String HOST_NAME;
030:
031: private static int PORT;
032:
033: private static boolean BOUNDED_THREAD_POOL;
034:
035: private static boolean LISTENER_THREAD_RUN_AS_DAEMON;
036:
037: private static int BACKLOG;
038:
039: private static int NUM_LISTENER_THREADS;
040:
041: private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY;
042:
043: private static int CLIENT_THREAD_TIMEOUT;
044:
045: private static int THREAD_POOL_MAX_SIZE;
046:
047: private static int THREAD_POOL_MIN_SIZE;
048:
049: private static int THREAD_POOL_INIT_SIZE;
050:
051: private static int THREAD_POOL_KEEP_ALIVE_TIME;
052:
053: private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED;
054:
055: /**
056: * Initalize the server properties.
057: */
058: static {
059: Properties properties = new Properties();
060: try {
061: properties.load(new FileInputStream(System
062: .getProperty("aspectwerkz.resource.bundle")));
063: } catch (Exception e) {
064: System.out
065: .println("no aspectwerkz resource bundle found on classpath, using defaults");
066:
067: // ignore, use defaults
068: }
069: String property = properties
070: .getProperty("remote.server.hostname");
071: if (property == null) {
072: HOST_NAME = property;
073: } else {
074: HOST_NAME = property;
075: }
076: property = properties.getProperty("remote.server.port");
077: if (property == null) {
078: PORT = 7777;
079: } else {
080: PORT = Integer.parseInt(property);
081: }
082: property = properties
083: .getProperty("remote.server.listener.threads.backlog");
084: if (property == null) {
085: BACKLOG = 200;
086: } else {
087: BACKLOG = Integer.parseInt(property);
088: }
089: property = properties
090: .getProperty("remote.server.listener.threads.nr");
091: if (property == null) {
092: NUM_LISTENER_THREADS = 10;
093: } else {
094: NUM_LISTENER_THREADS = Integer.parseInt(property);
095: }
096: property = properties
097: .getProperty("remote.server.client.threads.timeout");
098: if (property == null) {
099: CLIENT_THREAD_TIMEOUT = 60000;
100: } else {
101: CLIENT_THREAD_TIMEOUT = Integer.parseInt(property);
102: }
103: property = properties
104: .getProperty("remote.server.thread.pool.max.size");
105: if (property == null) {
106: THREAD_POOL_MAX_SIZE = 100;
107: } else {
108: THREAD_POOL_MAX_SIZE = Integer.parseInt(property);
109: }
110: property = properties
111: .getProperty("remote.server.thread.pool.min.size");
112: if (property == null) {
113: THREAD_POOL_MIN_SIZE = 10;
114: } else {
115: THREAD_POOL_MIN_SIZE = Integer.parseInt(property);
116: }
117: property = properties
118: .getProperty("remote.server.thread.pool.init.size");
119: if (property == null) {
120: THREAD_POOL_INIT_SIZE = 10;
121: } else {
122: THREAD_POOL_INIT_SIZE = Integer.parseInt(property);
123: }
124: property = properties
125: .getProperty("remote.server.thread.pool.keep.alive.time");
126: if (property == null) {
127: THREAD_POOL_KEEP_ALIVE_TIME = 300000;
128: } else {
129: THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property);
130: }
131: property = properties
132: .getProperty("remote.server.thread.pool.type");
133: if ((property != null) && property.equals("dynamic")) {
134: BOUNDED_THREAD_POOL = false;
135: } else {
136: BOUNDED_THREAD_POOL = true;
137: }
138: property = properties
139: .getProperty("remote.server.listener.threads.run.as.daemon");
140: if ((property != null) && property.equals("true")) {
141: LISTENER_THREAD_RUN_AS_DAEMON = true;
142: } else {
143: LISTENER_THREAD_RUN_AS_DAEMON = false;
144: }
145: property = properties
146: .getProperty("remote.server.thread.pool.wait.when.blocked");
147: if ((property != null) && property.equals("true")) {
148: THREAD_POOL_WAIT_WHEN_BLOCKED = true;
149: } else {
150: THREAD_POOL_WAIT_WHEN_BLOCKED = false;
151: }
152: }
153:
154: /**
155: * The server socket.
156: */
157: private ServerSocket m_serverSocket = null;
158:
159: /**
160: * All listener threads.
161: */
162: private Thread[] m_listenerThreads = null;
163:
164: /**
165: * The thread pool.
166: */
167: private PooledExecutor m_threadPool = null;
168:
169: /**
170: * The class loader to use.
171: */
172: private ClassLoader m_loader = null;
173:
174: /**
175: * The invoker instance.
176: */
177: private Invoker m_invoker = null;
178:
179: /**
180: * Marks the server as running.
181: */
182: private boolean m_running = true;
183:
184: /**
185: * Starts a server object and starts listening for client access.
186: *
187: * @param loader the classloader to use
188: * @param invoker the invoker that makes the method invocation in the client thread
189: */
190: public RemoteProxyServer(final ClassLoader loader,
191: final Invoker invoker) {
192: m_invoker = invoker;
193: m_loader = loader;
194: }
195:
196: /**
197: * Starts up the proxy server.
198: */
199: public void start() {
200: m_running = true;
201: try {
202: InetAddress bindAddress = InetAddress.getByName(HOST_NAME);
203: m_serverSocket = new ServerSocket(PORT, BACKLOG,
204: bindAddress);
205: if (BOUNDED_THREAD_POOL) {
206: createBoundedThreadPool(THREAD_POOL_MAX_SIZE,
207: THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE,
208: THREAD_POOL_KEEP_ALIVE_TIME,
209: THREAD_POOL_WAIT_WHEN_BLOCKED);
210: } else {
211: createDynamicThreadPool(THREAD_POOL_MIN_SIZE,
212: THREAD_POOL_INIT_SIZE,
213: THREAD_POOL_KEEP_ALIVE_TIME);
214: }
215: m_listenerThreads = new Thread[NUM_LISTENER_THREADS];
216: for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
217: m_listenerThreads[i] = new Thread(this );
218: m_listenerThreads[i].setName("AspectWerkz::Listener "
219: + (i + 1));
220: m_listenerThreads[i]
221: .setDaemon(LISTENER_THREAD_RUN_AS_DAEMON);
222: m_listenerThreads[i]
223: .setPriority(LISTENER_THREAD_PRIORITY);
224: m_listenerThreads[i].start();
225: }
226: } catch (IOException e) {
227: throw new WrappedRuntimeException(e);
228: }
229: }
230:
231: /**
232: * Stops the socket proxy server.
233: */
234: public void stop() {
235: m_running = false;
236: for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
237: m_listenerThreads[i].interrupt();
238: }
239: m_threadPool.shutdownNow();
240: }
241:
242: /**
243: * Does the actual work of listening for a client request and spawns a new RemoteProxyServerThread to serve the
244: * client.
245: */
246: public void run() {
247: try {
248: while (m_running) {
249: final Socket clientSocket = m_serverSocket.accept();
250: synchronized (m_threadPool) {
251: m_threadPool.execute(new RemoteProxyServerThread(
252: clientSocket, m_loader, m_invoker,
253: CLIENT_THREAD_TIMEOUT));
254: }
255: }
256: m_serverSocket.close();
257: } catch (Exception e) {
258: throw new WrappedRuntimeException(e);
259: }
260: }
261:
262: /**
263: * Creates a new bounded thread pool.
264: *
265: * @param threadPoolMaxSize
266: * @param threadPoolMinSize
267: * @param threadPoolInitSize
268: * @param keepAliveTime
269: * @param waitWhenBlocked
270: */
271: private void createBoundedThreadPool(final int threadPoolMaxSize,
272: final int threadPoolMinSize, final int threadPoolInitSize,
273: final int keepAliveTime, final boolean waitWhenBlocked) {
274: m_threadPool = new PooledExecutor(new BoundedBuffer(
275: threadPoolInitSize), threadPoolMaxSize);
276: m_threadPool.setKeepAliveTime(keepAliveTime);
277: m_threadPool.createThreads(threadPoolInitSize);
278: m_threadPool.setMinimumPoolSize(threadPoolMinSize);
279: if (waitWhenBlocked) {
280: m_threadPool.waitWhenBlocked();
281: }
282: }
283:
284: /**
285: * Creates a new dynamic thread pool
286: *
287: * @param threadPoolMinSize
288: * @param threadPoolInitSize
289: * @param keepAliveTime
290: */
291: private void createDynamicThreadPool(final int threadPoolMinSize,
292: final int threadPoolInitSize, final int keepAliveTime) {
293: m_threadPool = new PooledExecutor(new LinkedQueue());
294: m_threadPool.setKeepAliveTime(keepAliveTime);
295: m_threadPool.createThreads(threadPoolInitSize);
296: m_threadPool.setMinimumPoolSize(threadPoolMinSize);
297: }
298: }
|