001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.aspectwerkz.connectivity;
005:
006: import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
007: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
009:
010: import com.tc.aspectwerkz.exception.WrappedRuntimeException;
011:
012: import java.io.FileInputStream;
013: import java.io.IOException;
014: import java.net.InetAddress;
015: import java.net.ServerSocket;
016: import java.net.Socket;
017: import java.util.Properties;
018:
019: /**
020: * Producer that listens to a specified port for client requests. <p/>The implementation is based on sockets. <p/>The
021: * invoker spawns a specified number of listener threads in which each one of these spawns a new RemoteProxyServerThread
022: * for each client request that comes in. <p/>Uses a thread pool from util.concurrent.
023: *
024: * @author <a href="mailto:jboner@codehaus.org">Jonas BonŽr </a>
025: */
026: public class RemoteProxyServer implements Runnable {
027: private static String HOST_NAME;
028:
029: private static int PORT;
030:
031: private static boolean BOUNDED_THREAD_POOL;
032:
033: private static boolean LISTENER_THREAD_RUN_AS_DAEMON;
034:
035: private static int BACKLOG;
036:
037: private static int NUM_LISTENER_THREADS;
038:
039: private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY;
040:
041: private static int CLIENT_THREAD_TIMEOUT;
042:
043: private static int THREAD_POOL_MAX_SIZE;
044:
045: private static int THREAD_POOL_MIN_SIZE;
046:
047: private static int THREAD_POOL_INIT_SIZE;
048:
049: private static int THREAD_POOL_KEEP_ALIVE_TIME;
050:
051: private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED;
052:
053: /**
054: * Initalize the server properties.
055: */
056: static {
057: Properties properties = new Properties();
058: try {
059: properties.load(new FileInputStream(System
060: .getProperty("aspectwerkz.resource.bundle")));
061: } catch (Exception e) {
062: System.out
063: .println("no aspectwerkz resource bundle found on classpath, using defaults");
064:
065: // ignore, use defaults
066: }
067: String property = properties
068: .getProperty("remote.server.hostname");
069: if (property == null) {
070: HOST_NAME = property;
071: } else {
072: HOST_NAME = property;
073: }
074: property = properties.getProperty("remote.server.port");
075: if (property == null) {
076: PORT = 7777;
077: } else {
078: PORT = Integer.parseInt(property);
079: }
080: property = properties
081: .getProperty("remote.server.listener.threads.backlog");
082: if (property == null) {
083: BACKLOG = 200;
084: } else {
085: BACKLOG = Integer.parseInt(property);
086: }
087: property = properties
088: .getProperty("remote.server.listener.threads.nr");
089: if (property == null) {
090: NUM_LISTENER_THREADS = 10;
091: } else {
092: NUM_LISTENER_THREADS = Integer.parseInt(property);
093: }
094: property = properties
095: .getProperty("remote.server.client.threads.timeout");
096: if (property == null) {
097: CLIENT_THREAD_TIMEOUT = 60000;
098: } else {
099: CLIENT_THREAD_TIMEOUT = Integer.parseInt(property);
100: }
101: property = properties
102: .getProperty("remote.server.thread.pool.max.size");
103: if (property == null) {
104: THREAD_POOL_MAX_SIZE = 100;
105: } else {
106: THREAD_POOL_MAX_SIZE = Integer.parseInt(property);
107: }
108: property = properties
109: .getProperty("remote.server.thread.pool.min.size");
110: if (property == null) {
111: THREAD_POOL_MIN_SIZE = 10;
112: } else {
113: THREAD_POOL_MIN_SIZE = Integer.parseInt(property);
114: }
115: property = properties
116: .getProperty("remote.server.thread.pool.init.size");
117: if (property == null) {
118: THREAD_POOL_INIT_SIZE = 10;
119: } else {
120: THREAD_POOL_INIT_SIZE = Integer.parseInt(property);
121: }
122: property = properties
123: .getProperty("remote.server.thread.pool.keep.alive.time");
124: if (property == null) {
125: THREAD_POOL_KEEP_ALIVE_TIME = 300000;
126: } else {
127: THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property);
128: }
129: property = properties
130: .getProperty("remote.server.thread.pool.type");
131: if ((property != null) && property.equals("dynamic")) {
132: BOUNDED_THREAD_POOL = false;
133: } else {
134: BOUNDED_THREAD_POOL = true;
135: }
136: property = properties
137: .getProperty("remote.server.listener.threads.run.as.daemon");
138: if ((property != null) && property.equals("true")) {
139: LISTENER_THREAD_RUN_AS_DAEMON = true;
140: } else {
141: LISTENER_THREAD_RUN_AS_DAEMON = false;
142: }
143: property = properties
144: .getProperty("remote.server.thread.pool.wait.when.blocked");
145: if ((property != null) && property.equals("true")) {
146: THREAD_POOL_WAIT_WHEN_BLOCKED = true;
147: } else {
148: THREAD_POOL_WAIT_WHEN_BLOCKED = false;
149: }
150: }
151:
152: /**
153: * The server socket.
154: */
155: private ServerSocket m_serverSocket = null;
156:
157: /**
158: * All listener threads.
159: */
160: private Thread[] m_listenerThreads = null;
161:
162: /**
163: * The thread pool.
164: */
165: private PooledExecutor m_threadPool = null;
166:
167: /**
168: * The class loader to use.
169: */
170: private ClassLoader m_loader = null;
171:
172: /**
173: * The invoker instance.
174: */
175: private Invoker m_invoker = null;
176:
177: /**
178: * Marks the server as running.
179: */
180: private boolean m_running = true;
181:
182: /**
183: * Starts a server object and starts listening for client access.
184: *
185: * @param loader the classloader to use
186: * @param invoker the invoker that makes the method invocation in the client thread
187: */
188: public RemoteProxyServer(final ClassLoader loader,
189: final Invoker invoker) {
190: m_invoker = invoker;
191: m_loader = loader;
192: }
193:
194: /**
195: * Starts up the proxy server.
196: */
197: public void start() {
198: m_running = true;
199: try {
200: InetAddress bindAddress = InetAddress.getByName(HOST_NAME);
201: m_serverSocket = new ServerSocket(PORT, BACKLOG,
202: bindAddress);
203: if (BOUNDED_THREAD_POOL) {
204: createBoundedThreadPool(THREAD_POOL_MAX_SIZE,
205: THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE,
206: THREAD_POOL_KEEP_ALIVE_TIME,
207: THREAD_POOL_WAIT_WHEN_BLOCKED);
208: } else {
209: createDynamicThreadPool(THREAD_POOL_MIN_SIZE,
210: THREAD_POOL_INIT_SIZE,
211: THREAD_POOL_KEEP_ALIVE_TIME);
212: }
213: m_listenerThreads = new Thread[NUM_LISTENER_THREADS];
214: for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
215: m_listenerThreads[i] = new Thread(this );
216: m_listenerThreads[i].setName("AspectWerkz::Listener "
217: + (i + 1));
218: m_listenerThreads[i]
219: .setDaemon(LISTENER_THREAD_RUN_AS_DAEMON);
220: m_listenerThreads[i]
221: .setPriority(LISTENER_THREAD_PRIORITY);
222: m_listenerThreads[i].start();
223: }
224: } catch (IOException e) {
225: throw new WrappedRuntimeException(e);
226: }
227: }
228:
229: /**
230: * Stops the socket proxy server.
231: */
232: public void stop() {
233: m_running = false;
234: for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
235: m_listenerThreads[i].interrupt();
236: }
237: m_threadPool.shutdownNow();
238: }
239:
240: /**
241: * Does the actual work of listening for a client request and spawns a new RemoteProxyServerThread to serve the
242: * client.
243: */
244: public void run() {
245: try {
246: while (m_running) {
247: final Socket clientSocket = m_serverSocket.accept();
248: synchronized (m_threadPool) {
249: m_threadPool.execute(new RemoteProxyServerThread(
250: clientSocket, m_loader, m_invoker,
251: CLIENT_THREAD_TIMEOUT));
252: }
253: }
254: m_serverSocket.close();
255: } catch (Exception e) {
256: throw new WrappedRuntimeException(e);
257: }
258: }
259:
260: /**
261: * Creates a new bounded thread pool.
262: *
263: * @param threadPoolMaxSize
264: * @param threadPoolMinSize
265: * @param threadPoolInitSize
266: * @param keepAliveTime
267: * @param waitWhenBlocked
268: */
269: private void createBoundedThreadPool(final int threadPoolMaxSize,
270: final int threadPoolMinSize, final int threadPoolInitSize,
271: final int keepAliveTime, final boolean waitWhenBlocked) {
272: m_threadPool = new PooledExecutor(new BoundedBuffer(
273: threadPoolInitSize), threadPoolMaxSize);
274: m_threadPool.setKeepAliveTime(keepAliveTime);
275: m_threadPool.createThreads(threadPoolInitSize);
276: m_threadPool.setMinimumPoolSize(threadPoolMinSize);
277: if (waitWhenBlocked) {
278: m_threadPool.waitWhenBlocked();
279: }
280: }
281:
282: /**
283: * Creates a new dynamic thread pool
284: *
285: * @param threadPoolMinSize
286: * @param threadPoolInitSize
287: * @param keepAliveTime
288: */
289: private void createDynamicThreadPool(final int threadPoolMinSize,
290: final int threadPoolInitSize, final int keepAliveTime) {
291: m_threadPool = new PooledExecutor(new LinkedQueue());
292: m_threadPool.setKeepAliveTime(keepAliveTime);
293: m_threadPool.createThreads(threadPoolInitSize);
294: m_threadPool.setMinimumPoolSize(threadPoolMinSize);
295: }
296: }
|