001: /* ====================================================================
002: The Jicarilla Software License
003:
004: Copyright (c) 2003 Leo Simons.
005: All rights reserved.
006:
007: Permission is hereby granted, free of charge, to any person obtaining
008: a copy of this software and associated documentation files (the
009: "Software"), to deal in the Software without restriction, including
010: without limitation the rights to use, copy, modify, merge, publish,
011: distribute, sublicense, and/or sell copies of the Software, and to
012: permit persons to whom the Software is furnished to do so, subject to
013: the following conditions:
014:
015: The above copyright notice and this permission notice shall be
016: included in all copies or substantial portions of the Software.
017:
018: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
019: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
020: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
021: IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
022: CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
023: TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
024: SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
025: ==================================================================== */
026: package org.jicarilla.net;
027:
028: import EDU.oswego.cs.dl.util.concurrent.Channel;
029: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
030: import org.apache.commons.pool.ObjectPool;
031: import org.jicarilla.lang.ExceptionListener;
032: import org.jicarilla.lang.AbstractActive;
033: import org.jicarilla.lang.Assert;
034: import org.jicarilla.plumbing.Sink;
035:
036: import java.io.IOException;
037: import java.net.InetAddress;
038: import java.net.InetSocketAddress;
039: import java.net.ServerSocket;
040: import java.net.SocketAddress;
041: import java.nio.channels.ServerSocketChannel;
042: import java.nio.channels.SocketChannel;
043:
044: /**
045: * This is a basic New-I/O socket server.
046: *
047: * On startup, a configurable number of threads is
048: * created which listen for requests and delegate
049: * them to the handler once basic parsing has been
050: * done.
051: *
052: * All other processing is then delegated to a handler
053: * from the handler pool.
054: *
055: * In case of any unusual failure, a fallback error
056: * handler is used.
057: *
058: * This is a mostly type-3-compatible IoC component with
059: * two additional lifecycle methods: initialize() and
060: * releaseInstance(). The additional lifecycle is neccessary
061: * because we should not create threads in the
062: * constructor.
063: *
064: * @author <a href="mailto: lsimons at jicarilla dot org">Leo Simons</a>
065: * @version $Id: SocketServerImpl.java,v 1.7 2004/03/23 13:37:50 lsimons Exp $
066: */
067: public class SocketServerImpl extends AbstractActive implements
068: SocketServer {
069: public final static int SLEEP_DURING_DISPOSE = 2000;
070: public final static int MAXIMUM_PORT_NUMBER = 65536;
071:
072: // ----------------------------------------------------------------------
073: // Properties
074: // ----------------------------------------------------------------------
075: protected int m_port;
076: protected int m_backlog;
077: protected InetAddress m_address;
078: protected String m_rootDirectory;
079: protected int m_numThreads;
080:
081: protected ExceptionListener m_exceptionListener;
082: protected ServerSocketChannel m_channel;
083: protected ObjectPool m_eventPool;
084: protected ObjectPool m_handlerPool;
085: protected Sink m_errorHandler;
086: protected PooledExecutor m_threadPool;
087:
088: protected StoppableRunnable[] m_runners;
089:
090: // ----------------------------------------------------------------------
091: // Constructor
092: // ----------------------------------------------------------------------
093: public SocketServerImpl(final SocketServerConfig config,
094: final ExceptionListener exceptionListener,
095: final ObjectPool eventPool, final ObjectPool handlerPool,
096: final Sink errorHandler, final PooledExecutor threadPool)
097: throws IOException {
098: // configuration
099: setNumThreads(config.getNumberOfThreads());
100: setPort(config.getPort());
101: setBacklog(config.getBacklog());
102: setAddress(InetAddress.getByName(config.getAddress()));
103:
104: // composition
105: setEventPool(eventPool);
106: setHandlerPool(handlerPool);
107: setErrorHandler(errorHandler);
108: setThreadPool(threadPool);
109: setExceptionListener(exceptionListener);
110:
111: final ServerSocketChannel channel = ServerSocketChannel.open();
112: final SocketAddress address = new InetSocketAddress(
113: getAddress(), getPort());
114: channel.socket().bind(address, getBacklog());
115: setChannel(channel);
116: }
117:
118: // to allow type-2 initialization
119: protected SocketServerImpl() {
120: }
121:
122: // ----------------------------------------------------------------------
123: // Contract: AbstractActive
124: // ----------------------------------------------------------------------
125:
126: protected void doInitialize() throws Throwable {
127: final SocketServerImpl server = this ;
128: m_runners = new StoppableRunnable[m_numThreads];
129:
130: for (int i = 0; i < m_numThreads; i++) {
131: m_runners[i] = createRunner(server);
132: executeRunner(m_runners[i]);
133: }
134: }
135:
136: protected void doDispose() throws IOException, InterruptedException {
137: if (m_stopped)
138: return;
139:
140: for (int i = 0; i < m_runners.length; i++)
141: m_runners[i].stop();
142:
143: // give runners 2 seconds to finish pending requests...
144: Thread.sleep(SLEEP_DURING_DISPOSE);
145:
146: getChannel().close();
147: getSocket().close();
148:
149: m_channel = null;
150: }
151:
152: public void finalize() throws Throwable {
153: dispose();
154: super .finalize();
155: }
156:
157: // ----------------------------------------------------------------------
158: // Getters/Setters
159: // ----------------------------------------------------------------------
160: protected int getPort() {
161: return m_port;
162: }
163:
164: protected void setPort(final int port) {
165: Assert.assertTrue("port must be between 0 and 65536", port >= 0
166: && port <= MAXIMUM_PORT_NUMBER);
167: m_port = port;
168: }
169:
170: protected InetAddress getAddress() {
171: return m_address;
172: }
173:
174: protected void setAddress(final InetAddress address) {
175: Assert.assertNotNull("address argument may not be null",
176: address);
177: m_address = address;
178: }
179:
180: protected String getRootDirectory() {
181: return m_rootDirectory;
182: }
183:
184: protected void setRootDirectory(final String rootDirectory) {
185: Assert.assertNotNull("rootDirectory argument may not be null",
186: rootDirectory);
187: m_rootDirectory = rootDirectory;
188: }
189:
190: protected ExceptionListener getExceptionListener() {
191: return m_exceptionListener;
192: }
193:
194: protected void setExceptionListener(
195: final ExceptionListener exceptionListener) {
196: Assert.assertNotNull(
197: "exceptionListener argument may not be null",
198: exceptionListener);
199: m_exceptionListener = exceptionListener;
200: }
201:
202: protected ServerSocket getSocket() {
203: return getChannel().socket();
204: //return m_socket;
205: }
206:
207: protected void setSocket(final ServerSocket socket) {
208: Assert.assertNotNull("socket argument may not be null", socket);
209: //m_socket = socket;
210: setChannel(socket.getChannel());
211: }
212:
213: protected ServerSocketChannel getChannel() {
214: return m_channel;
215: }
216:
217: protected void setChannel(final ServerSocketChannel channel) {
218: Assert.assertNotNull("channel argument may not be null",
219: channel);
220: m_channel = channel;
221: }
222:
223: protected int getBacklog() {
224: return m_backlog;
225: }
226:
227: protected void setBacklog(final int backlog) {
228: Assert.assertTrue("backlog argument must be 0 or bigger",
229: backlog >= 0);
230: m_backlog = backlog;
231: }
232:
233: protected int getNumThreads() {
234: return m_numThreads;
235: }
236:
237: protected void setNumThreads(final int numThreads) {
238: Assert.assertTrue("numThreads argument must be 1 or bigger",
239: numThreads > 0);
240: m_numThreads = numThreads;
241: }
242:
243: protected ObjectPool getEventPool() {
244: return m_eventPool;
245: }
246:
247: protected void setEventPool(final ObjectPool eventPool) {
248: Assert.assertNotNull("eventPool argument may not be null",
249: eventPool);
250: m_eventPool = eventPool;
251: }
252:
253: protected ObjectPool getHandlerPool() {
254: return m_handlerPool;
255: }
256:
257: protected void setHandlerPool(final ObjectPool handlerPool) {
258: Assert.assertNotNull("handlerPool argument may not be null",
259: handlerPool);
260: m_handlerPool = handlerPool;
261: }
262:
263: protected Sink getErrorHandler() {
264: return m_errorHandler;
265: }
266:
267: protected void setErrorHandler(final Sink errorHandler) {
268: Assert.assertNotNull("errorHandler argument may not be null",
269: errorHandler);
270: m_errorHandler = errorHandler;
271: }
272:
273: protected PooledExecutor getThreadPool() {
274: return m_threadPool;
275: }
276:
277: protected void setThreadPool(final PooledExecutor threadPool) {
278: Assert.assertNotNull("threadPool argument may not be null",
279: threadPool);
280: m_threadPool = threadPool;
281: }
282:
283: protected StoppableRunnable[] getRunners() {
284: return m_runners;
285: }
286:
287: // ----------------------------------------------------------------------
288: // Helper Methods
289: // ----------------------------------------------------------------------
290:
291: protected StoppableRunnable createRunner(
292: final SocketServerImpl server) {
293: return new RequestHandlingWorker(server);
294: }
295:
296: protected void executeRunner(final Runnable r) throws Throwable {
297: try {
298: m_threadPool.execute(r);
299: } catch (InterruptedException ie) {
300: try {
301: dispose();
302: } catch (Exception e) {
303: }
304: throw ie;
305: }
306: }
307:
308: /**
309: * Called from worker threads to accept new socket
310: * connections.
311: */
312: protected void handleRequest() throws InterruptedException {
313: if (!getChannel().isOpen())
314: throw new InterruptedException();
315:
316: SocketChannel channel = null;
317: try {
318: channel = getChannel().accept();
319: handleRequest(channel);
320: } catch (IOException ioe) {
321: handleRequestException(channel, ioe);
322: } catch (SocketServerException se) {
323: handleRequestException(channel, se);
324: }
325: }
326:
327: /**
328: * Called from {@link #handleRequest()} to do the
329: * actual work and may be called from elsewhere to
330: * continue communication on open sockets.
331: *
332: * @param channel the channel to get a request from
333: */
334: protected void handleRequest(final SocketChannel channel)
335: throws InterruptedException, SocketServerException {
336: final Event event = borrowEvent();
337: final Channel handler = borrowChannel();
338:
339: event.setChannel(channel);
340:
341: // run the channel
342: setPoolReturningSink(handler, event);
343: handler.put(event);
344: }
345:
346: /**
347: * Appends a handler to the end of an event handler
348: * chain that will return pooled objects back to
349: * from whence they came.
350: *
351: * @param handler
352: * @param event
353: */
354: protected final void setPoolReturningSink(final Channel handler,
355: final Event event) throws InterruptedException {
356: final Sink sink = new PoolReturningSinkSink(handler);
357: m_threadPool.execute(new PoolReturningWorker(handler, sink,
358: event));
359: }
360:
361: protected void handleRequestException(final SocketChannel channel,
362: final Exception ex) {
363: try {
364: if (channel != null)
365: channel.close();
366: } catch (IOException e) { /* too bad */
367: }
368:
369: getExceptionListener().exceptionOccurred(ex);
370: }
371:
372: protected Channel borrowChannel() throws SocketServerException {
373: try {
374: return (Channel) getHandlerPool().borrowObject();
375: } catch (Exception e) {
376: throw new SocketServerException(
377: "Can't create a new channel!", e);
378: }
379: }
380:
381: protected Event borrowEvent() throws SocketServerException {
382: try {
383: return (Event) getEventPool().borrowObject();
384: } catch (Exception e) {
385: throw new SocketServerException(
386: "Can't create a new event!", e);
387: }
388: }
389:
390: // ----------------------------------------------------------------------
391: // Inner Interface: StoppableRunnable
392: // ----------------------------------------------------------------------
393:
394: protected interface StoppableRunnable extends Runnable {
395: void stop();
396: }
397:
398: // ----------------------------------------------------------------------
399: // Inner Class: RequestHandlingWorker
400: // ----------------------------------------------------------------------
401:
402: protected static class RequestHandlingWorker implements
403: StoppableRunnable {
404: protected boolean m_running = true;
405: private final SocketServerImpl m_server;
406:
407: protected RequestHandlingWorker(final SocketServerImpl server) {
408: m_server = server;
409: }
410:
411: public void run() {
412: while (m_running) {
413: try {
414: m_server.handleRequest();
415: Thread.yield();
416: } catch (InterruptedException ie) {
417: m_running = false;
418: Thread.currentThread().interrupt();
419: }
420: }
421: }
422:
423: public void stop() {
424: m_running = false;
425: }
426: }
427:
428: // ----------------------------------------------------------------------
429: // Inner Class: PoolReturningSink
430: // ----------------------------------------------------------------------
431:
432: protected class PoolReturningSinkSink implements Sink {
433: protected final Channel m_handler;
434:
435: protected PoolReturningSinkSink(final Channel handler) {
436: m_handler = handler;
437: }
438:
439: public void put(final Object o) {
440: try {
441: getHandlerPool().returnObject(m_handler);
442: } catch (Exception e) {
443: }
444:
445: try {
446: getEventPool().returnObject(o);
447: } catch (Exception e) {
448: }
449: }
450:
451: public boolean offer(final Object o, final long l) {
452: put(o);
453: return true;
454: }
455: }
456:
457: // ----------------------------------------------------------------------
458: // Inner Class: PoolReturningWorker
459: // ----------------------------------------------------------------------
460:
461: protected static class PoolReturningWorker implements Runnable {
462: protected final Channel m_handler;
463: protected final Sink m_sink;
464: protected final Event m_event;
465:
466: protected PoolReturningWorker(final Channel handler,
467: final Sink sink, final Event event) {
468: m_handler = handler;
469: m_sink = sink;
470: m_event = event;
471: }
472:
473: public void run() {
474: try {
475: m_handler.take(); // block
476: m_sink.put(m_event); // cleanup
477: } catch (InterruptedException ie) {
478: Thread.currentThread().interrupt();
479: }
480: }
481: }
482:
483: }
|