001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.net.SocketAddress;
023: import java.util.Collections;
024: import java.util.HashMap;
025: import java.util.HashSet;
026: import java.util.Iterator;
027: import java.util.List;
028: import java.util.Map;
029: import java.util.Queue;
030: import java.util.Set;
031: import java.util.concurrent.ConcurrentLinkedQueue;
032: import java.util.concurrent.Executor;
033: import java.util.concurrent.ExecutorService;
034: import java.util.concurrent.LinkedBlockingQueue;
035: import java.util.concurrent.RejectedExecutionException;
036: import java.util.concurrent.ThreadPoolExecutor;
037: import java.util.concurrent.TimeUnit;
038: import java.util.concurrent.atomic.AtomicInteger;
039:
040: import org.apache.mina.util.NamePreservingRunnable;
041:
042: /**
043: * @author The Apache MINA Project (dev@mina.apache.org)
044: * @version $Rev: 605069 $, $Date: 2007-12-17 19:47:03 -0700 (Mon, 17 Dec 2007) $
045: */
046: public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
047: extends AbstractIoAcceptor {
048:
049: private static final AtomicInteger id = new AtomicInteger();
050:
051: private final Executor executor;
052: private final boolean createdExecutor;
053: private final String threadName;
054: private final IoProcessor<T> processor;
055: private final boolean createdProcessor;
056:
057: private final Object lock = new Object();
058:
059: private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
060: private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
061:
062: private final Map<SocketAddress, H> boundHandles = Collections
063: .synchronizedMap(new HashMap<SocketAddress, H>());
064:
065: private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
066: private volatile boolean selectable;
067: private Worker worker;
068:
069: /**
070: * Create an acceptor with a single processing thread using a NewThreadExecutor
071: */
072: protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
073: Class<? extends IoProcessor<T>> processorClass) {
074: this (sessionConfig, null, new SimpleIoProcessorPool<T>(
075: processorClass), true);
076: }
077:
078: protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
079: Class<? extends IoProcessor<T>> processorClass,
080: int processorCount) {
081: this (sessionConfig, null, new SimpleIoProcessorPool<T>(
082: processorClass, processorCount), true);
083: }
084:
085: protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
086: IoProcessor<T> processor) {
087: this (sessionConfig, null, processor, false);
088: }
089:
090: protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
091: Executor executor, IoProcessor<T> processor) {
092: this (sessionConfig, executor, processor, false);
093: }
094:
095: private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
096: Executor executor, IoProcessor<T> processor,
097: boolean createdProcessor) {
098: super (sessionConfig);
099:
100: if (processor == null) {
101: throw new NullPointerException("processor");
102: }
103:
104: if (executor == null) {
105: this .executor = new ThreadPoolExecutor(1, 1, 1L,
106: TimeUnit.SECONDS,
107: new LinkedBlockingQueue<Runnable>());
108: this .createdExecutor = true;
109: } else {
110: this .executor = executor;
111: this .createdExecutor = false;
112: }
113:
114: this .threadName = getClass().getSimpleName() + '-'
115: + id.incrementAndGet();
116: this .processor = processor;
117: this .createdProcessor = createdProcessor;
118:
119: try {
120: init();
121: selectable = true;
122: } catch (RuntimeException e) {
123: throw e;
124: } catch (Exception e) {
125: throw new RuntimeIoException("Failed to initialize.", e);
126: } finally {
127: if (!selectable) {
128: try {
129: destroy();
130: } catch (Exception e) {
131: ExceptionMonitor.getInstance().exceptionCaught(e);
132: }
133: }
134: }
135: }
136:
137: protected abstract void init() throws Exception;
138:
139: protected abstract void destroy() throws Exception;
140:
141: protected abstract boolean select() throws Exception;
142:
143: protected abstract void wakeup();
144:
145: protected abstract Iterator<H> selectedHandles();
146:
147: protected abstract H open(SocketAddress localAddress)
148: throws Exception;
149:
150: protected abstract SocketAddress localAddress(H handle)
151: throws Exception;
152:
153: protected abstract T accept(IoProcessor<T> processor, H handle)
154: throws Exception;
155:
156: protected abstract void close(H handle) throws Exception;
157:
158: @Override
159: protected IoFuture dispose0() throws Exception {
160: unbind();
161: if (!disposalFuture.isDone()) {
162: try {
163: startupWorker();
164: wakeup();
165: } catch (RejectedExecutionException e) {
166: if (createdExecutor) {
167: // Ignore.
168: } else {
169: throw e;
170: }
171: }
172: }
173: return disposalFuture;
174: }
175:
176: @Override
177: protected final Set<SocketAddress> bind0(
178: List<? extends SocketAddress> localAddresses)
179: throws Exception {
180: AcceptorOperationFuture request = new AcceptorOperationFuture(
181: localAddresses);
182:
183: // adds the Registration request to the queue for the Workers
184: // to handle
185: registerQueue.add(request);
186:
187: // creates an instance of a Worker and has the local
188: // executor kick it off.
189: startupWorker();
190: wakeup();
191: request.awaitUninterruptibly();
192:
193: if (request.getException() != null) {
194: throw request.getException();
195: }
196:
197: // Update the local addresses.
198: // setLocalAddresses() shouldn't be called from the worker thread
199: // because of deadlock.
200: Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
201: for (H handle : boundHandles.values()) {
202: newLocalAddresses.add(localAddress(handle));
203: }
204:
205: return newLocalAddresses;
206: }
207:
208: /**
209: * This method is called by the doBind() and doUnbind()
210: * methods. If the worker object is not null, presumably
211: * the acceptor is starting up, then the worker object will
212: * be created and kicked off by the executor. If the worker
213: * object is not null, probably already created and this class
214: * is now working, then nothing will happen and the method
215: * will just return.
216: */
217: private void startupWorker() {
218: if (!selectable) {
219: registerQueue.clear();
220: cancelQueue.clear();
221: }
222:
223: synchronized (lock) {
224: if (worker == null) {
225: worker = new Worker();
226: executor.execute(new NamePreservingRunnable(worker,
227: threadName));
228: }
229: }
230: }
231:
232: @Override
233: protected final void unbind0(
234: List<? extends SocketAddress> localAddresses)
235: throws Exception {
236: AcceptorOperationFuture future = new AcceptorOperationFuture(
237: localAddresses);
238:
239: cancelQueue.add(future);
240: startupWorker();
241: wakeup();
242:
243: future.awaitUninterruptibly();
244: if (future.getException() != null) {
245: throw future.getException();
246: }
247: }
248:
249: /**
250: * This class is called by the startupWorker() method and is
251: * placed into a NamePreservingRunnable class.
252: */
253: private class Worker implements Runnable {
254: public void run() {
255: int nHandles = 0;
256:
257: while (selectable) {
258: try {
259: // gets the number of keys that are ready to go
260: boolean selected = select();
261:
262: // this actually sets the selector to OP_ACCEPT,
263: // and binds to the port in which this class will
264: // listen on
265: nHandles += registerHandles();
266:
267: if (selected) {
268: processHandles(selectedHandles());
269: }
270:
271: // check to see if any cancellation request has been made.
272: nHandles -= unregisterHandles();
273:
274: if (nHandles == 0) {
275: synchronized (lock) {
276: if (registerQueue.isEmpty()
277: && cancelQueue.isEmpty()) {
278: worker = null;
279: break;
280: }
281: }
282: }
283: } catch (Throwable e) {
284: ExceptionMonitor.getInstance().exceptionCaught(e);
285:
286: try {
287: Thread.sleep(1000);
288: } catch (InterruptedException e1) {
289: ExceptionMonitor.getInstance().exceptionCaught(
290: e1);
291: }
292: }
293: }
294:
295: if (selectable && isDisposing()) {
296: selectable = false;
297: try {
298: if (createdProcessor) {
299: processor.dispose();
300: }
301: } finally {
302: try {
303: destroy();
304: } catch (Exception e) {
305: ExceptionMonitor.getInstance().exceptionCaught(
306: e);
307: } finally {
308: disposalFuture.setDone();
309: if (createdExecutor) {
310: ((ExecutorService) executor).shutdown();
311: }
312: }
313: }
314: }
315: }
316:
317: /**
318: * This method will process new sessions for the Worker class. All
319: * keys that have had their status updates as per the Selector.selectedKeys()
320: * method will be processed here. Only keys that are ready to accept
321: * connections are handled here.
322: * <p/>
323: * Session objects are created by making new instances of SocketSessionImpl
324: * and passing the session object to the SocketIoProcessor class.
325: */
326: @SuppressWarnings("unchecked")
327: private void processHandles(Iterator<H> handles)
328: throws Exception {
329: while (handles.hasNext()) {
330: H handle = handles.next();
331: handles.remove();
332:
333: T session = accept(processor, handle);
334: if (session == null) {
335: break;
336: }
337:
338: finishSessionInitialization(session, null, null);
339:
340: // add the session to the SocketIoProcessor
341: session.getProcessor().add(session);
342: }
343: }
344: }
345:
346: /**
347: * Sets up the socket communications. Sets items such as:
348: * <p/>
349: * Blocking
350: * Reuse address
351: * Receive buffer size
352: * Bind to listen port
353: * Registers OP_ACCEPT for selector
354: */
355: private int registerHandles() {
356: for (;;) {
357: AcceptorOperationFuture future = registerQueue.poll();
358: if (future == null) {
359: break;
360: }
361:
362: Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
363: List<SocketAddress> localAddresses = future
364: .getLocalAddresses();
365:
366: try {
367: for (SocketAddress a : localAddresses) {
368: H handle = open(a);
369: newHandles.put(localAddress(handle), handle);
370: }
371:
372: boundHandles.putAll(newHandles);
373:
374: // and notify.
375: future.setDone();
376: return newHandles.size();
377: } catch (Exception e) {
378: future.setException(e);
379: } finally {
380: // Roll back if failed to bind all addresses.
381: if (future.getException() != null) {
382: for (H handle : newHandles.values()) {
383: try {
384: close(handle);
385: } catch (Exception e) {
386: ExceptionMonitor.getInstance()
387: .exceptionCaught(e);
388: }
389: }
390: wakeup();
391: }
392: }
393: }
394:
395: return 0;
396: }
397:
398: /**
399: * This method just checks to see if anything has been placed into the
400: * cancellation queue. The only thing that should be in the cancelQueue
401: * is CancellationRequest objects and the only place this happens is in
402: * the doUnbind() method.
403: */
404: private int unregisterHandles() {
405: int cancelledHandles = 0;
406: for (;;) {
407: AcceptorOperationFuture future = cancelQueue.poll();
408: if (future == null) {
409: break;
410: }
411:
412: // close the channels
413: for (SocketAddress a : future.getLocalAddresses()) {
414: H handle = boundHandles.remove(a);
415: if (handle == null) {
416: continue;
417: }
418:
419: try {
420: close(handle);
421: wakeup(); // wake up again to trigger thread death
422: } catch (Throwable e) {
423: ExceptionMonitor.getInstance().exceptionCaught(e);
424: } finally {
425: cancelledHandles++;
426: }
427: }
428:
429: future.setDone();
430: }
431:
432: return cancelledHandles;
433: }
434:
435: public final IoSession newSession(SocketAddress remoteAddress,
436: SocketAddress localAddress) {
437: throw new UnsupportedOperationException();
438: }
439: }
|