001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.net.ConnectException;
023: import java.net.SocketAddress;
024: import java.util.Iterator;
025: import java.util.Queue;
026: import java.util.concurrent.ConcurrentLinkedQueue;
027: import java.util.concurrent.Executor;
028: import java.util.concurrent.ExecutorService;
029: import java.util.concurrent.LinkedBlockingQueue;
030: import java.util.concurrent.RejectedExecutionException;
031: import java.util.concurrent.ThreadPoolExecutor;
032: import java.util.concurrent.TimeUnit;
033: import java.util.concurrent.atomic.AtomicInteger;
034:
035: import org.apache.mina.util.NamePreservingRunnable;
036:
037: /**
038: * @author The Apache MINA Project (dev@mina.apache.org)
039: * @version $Rev: 607163 $, $Date: 2007-12-27 20:20:07 -0700 (Thu, 27 Dec 2007) $
040: */
041: public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
042: extends AbstractIoConnector {
043:
044: private static final AtomicInteger id = new AtomicInteger();
045:
046: private final Object lock = new Object();
047: private final String threadName;
048: private final Executor executor;
049: private final boolean createdExecutor;
050: private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
051: private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
052: private final IoProcessor<T> processor;
053: private final boolean createdProcessor;
054:
055: private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
056: private volatile boolean selectable;
057: private Worker worker;
058:
059: protected AbstractPollingIoConnector(IoSessionConfig sessionConfig,
060: Class<? extends IoProcessor<T>> processorClass) {
061: this (sessionConfig, null, new SimpleIoProcessorPool<T>(
062: processorClass), true);
063: }
064:
065: protected AbstractPollingIoConnector(IoSessionConfig sessionConfig,
066: Class<? extends IoProcessor<T>> processorClass,
067: int processorCount) {
068: this (sessionConfig, null, new SimpleIoProcessorPool<T>(
069: processorClass, processorCount), true);
070: }
071:
072: protected AbstractPollingIoConnector(IoSessionConfig sessionConfig,
073: IoProcessor<T> processor) {
074: this (sessionConfig, null, processor, false);
075: }
076:
077: protected AbstractPollingIoConnector(IoSessionConfig sessionConfig,
078: Executor executor, IoProcessor<T> processor) {
079: this (sessionConfig, executor, processor, false);
080: }
081:
082: private AbstractPollingIoConnector(IoSessionConfig sessionConfig,
083: Executor executor, IoProcessor<T> processor,
084: boolean createdProcessor) {
085: super (sessionConfig);
086:
087: if (processor == null) {
088: throw new NullPointerException("processor");
089: }
090:
091: if (executor == null) {
092: this .executor = new ThreadPoolExecutor(1, 1, 1L,
093: TimeUnit.SECONDS,
094: new LinkedBlockingQueue<Runnable>());
095: this .createdExecutor = true;
096: } else {
097: this .executor = executor;
098: this .createdExecutor = false;
099: }
100:
101: this .threadName = getClass().getSimpleName() + '-'
102: + id.incrementAndGet();
103: this .processor = processor;
104: this .createdProcessor = createdProcessor;
105:
106: try {
107: init();
108: selectable = true;
109: } catch (RuntimeException e) {
110: throw e;
111: } catch (Exception e) {
112: throw new RuntimeIoException("Failed to initialize.", e);
113: } finally {
114: if (!selectable) {
115: try {
116: destroy();
117: } catch (Exception e) {
118: ExceptionMonitor.getInstance().exceptionCaught(e);
119: }
120: }
121: }
122: }
123:
124: protected abstract void init() throws Exception;
125:
126: protected abstract void destroy() throws Exception;
127:
128: protected abstract H newHandle(SocketAddress localAddress)
129: throws Exception;
130:
131: protected abstract boolean connect(H handle,
132: SocketAddress remoteAddress) throws Exception;
133:
134: protected abstract boolean finishConnect(H handle) throws Exception;
135:
136: protected abstract T newSession(IoProcessor<T> processor, H handle)
137: throws Exception;
138:
139: protected abstract void close(H handle) throws Exception;
140:
141: protected abstract void wakeup();
142:
143: protected abstract boolean select(int timeout) throws Exception;
144:
145: protected abstract Iterator<H> selectedHandles();
146:
147: protected abstract Iterator<H> allHandles();
148:
149: protected abstract void register(H handle, ConnectionRequest request)
150: throws Exception;
151:
152: protected abstract ConnectionRequest connectionRequest(H handle);
153:
154: @Override
155: protected final IoFuture dispose0() throws Exception {
156: if (!disposalFuture.isDone()) {
157: try {
158: startupWorker();
159: wakeup();
160: } catch (RejectedExecutionException e) {
161: if (createdExecutor) {
162: // Ignore.
163: } else {
164: throw e;
165: }
166: }
167: }
168: return disposalFuture;
169: }
170:
171: @Override
172: @SuppressWarnings("unchecked")
173: protected final ConnectFuture connect0(
174: SocketAddress remoteAddress,
175: SocketAddress localAddress,
176: IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
177: H handle = null;
178: boolean success = false;
179: try {
180: handle = newHandle(localAddress);
181: if (connect(handle, remoteAddress)) {
182: ConnectFuture future = new DefaultConnectFuture();
183: T session = newSession(processor, handle);
184: finishSessionInitialization(session, future,
185: sessionInitializer);
186: // Forward the remaining process to the IoProcessor.
187: session.getProcessor().add(session);
188: success = true;
189: return future;
190: }
191:
192: success = true;
193: } catch (Exception e) {
194: return DefaultConnectFuture.newFailedFuture(e);
195: } finally {
196: if (!success && handle != null) {
197: try {
198: close(handle);
199: } catch (Exception e) {
200: ExceptionMonitor.getInstance().exceptionCaught(e);
201: }
202: }
203: }
204:
205: ConnectionRequest request = new ConnectionRequest(handle,
206: sessionInitializer);
207: connectQueue.add(request);
208: startupWorker();
209: wakeup();
210:
211: return request;
212: }
213:
214: private void startupWorker() {
215: if (!selectable) {
216: connectQueue.clear();
217: cancelQueue.clear();
218: }
219:
220: synchronized (lock) {
221: if (worker == null) {
222: worker = new Worker();
223: executor.execute(new NamePreservingRunnable(worker,
224: threadName));
225: }
226: }
227: }
228:
229: private int registerNew() {
230: int nHandles = 0;
231: for (;;) {
232: ConnectionRequest req = connectQueue.poll();
233: if (req == null) {
234: break;
235: }
236:
237: H handle = req.handle;
238: try {
239: register(handle, req);
240: nHandles++;
241: } catch (Exception e) {
242: req.setException(e);
243: try {
244: close(handle);
245: } catch (Exception e2) {
246: ExceptionMonitor.getInstance().exceptionCaught(e2);
247: }
248: }
249: }
250: return nHandles;
251: }
252:
253: private int cancelKeys() {
254: int nHandles = 0;
255: for (;;) {
256: ConnectionRequest req = cancelQueue.poll();
257: if (req == null) {
258: break;
259: }
260:
261: H handle = req.handle;
262: try {
263: close(handle);
264: } catch (Exception e) {
265: ExceptionMonitor.getInstance().exceptionCaught(e);
266: } finally {
267: nHandles++;
268: }
269: }
270: return nHandles;
271: }
272:
273: @SuppressWarnings("unchecked")
274: private int processSessions(Iterator<H> handlers) {
275: int nHandles = 0;
276: while (handlers.hasNext()) {
277: H handle = handlers.next();
278: handlers.remove();
279:
280: ConnectionRequest entry = connectionRequest(handle);
281: boolean success = false;
282: try {
283: if (finishConnect(handle)) {
284: T session = newSession(processor, handle);
285: finishSessionInitialization(session, entry, entry
286: .getSessionInitializer());
287: // Forward the remaining process to the IoProcessor.
288: session.getProcessor().add(session);
289: nHandles++;
290: }
291: success = true;
292: } catch (Throwable e) {
293: entry.setException(e);
294: } finally {
295: if (!success) {
296: cancelQueue.offer(entry);
297: }
298: }
299: }
300: return nHandles;
301: }
302:
303: private void processTimedOutSessions(Iterator<H> handles) {
304: long currentTime = System.currentTimeMillis();
305:
306: while (handles.hasNext()) {
307: H handle = handles.next();
308: ConnectionRequest entry = connectionRequest(handle);
309:
310: if (currentTime >= entry.deadline) {
311: entry.setException(new ConnectException(
312: "Connection timed out."));
313: cancelQueue.offer(entry);
314: }
315: }
316: }
317:
318: private class Worker implements Runnable {
319:
320: public void run() {
321: int nHandles = 0;
322: while (selectable) {
323: try {
324: boolean selected = select(1000);
325:
326: nHandles += registerNew();
327:
328: if (selected) {
329: nHandles -= processSessions(selectedHandles());
330: }
331:
332: processTimedOutSessions(allHandles());
333:
334: nHandles -= cancelKeys();
335:
336: if (nHandles == 0) {
337: synchronized (lock) {
338: if (connectQueue.isEmpty()) {
339: worker = null;
340: break;
341: }
342: }
343: }
344: } catch (Throwable e) {
345: ExceptionMonitor.getInstance().exceptionCaught(e);
346:
347: try {
348: Thread.sleep(1000);
349: } catch (InterruptedException e1) {
350: ExceptionMonitor.getInstance().exceptionCaught(
351: e1);
352: }
353: }
354: }
355:
356: if (selectable && isDisposing()) {
357: selectable = false;
358: try {
359: if (createdProcessor) {
360: processor.dispose();
361: }
362: } finally {
363: try {
364: destroy();
365: } catch (Exception e) {
366: ExceptionMonitor.getInstance().exceptionCaught(
367: e);
368: } finally {
369: disposalFuture.setDone();
370: if (createdExecutor) {
371: ((ExecutorService) executor).shutdown();
372: }
373: }
374: }
375: }
376: }
377: }
378:
379: protected final class ConnectionRequest extends
380: DefaultConnectFuture {
381: private final H handle;
382: private final long deadline;
383: private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
384:
385: public ConnectionRequest(H handle,
386: IoSessionInitializer<? extends ConnectFuture> callback) {
387: this .handle = handle;
388: long timeout = getConnectTimeoutMillis();
389: if (timeout <= 0L) {
390: this .deadline = Long.MAX_VALUE;
391: } else {
392: this .deadline = System.currentTimeMillis() + timeout;
393: }
394: this .sessionInitializer = callback;
395: }
396:
397: public H getHandle() {
398: return handle;
399: }
400:
401: public long getDeadline() {
402: return deadline;
403: }
404:
405: public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
406: return sessionInitializer;
407: }
408:
409: @Override
410: public void cancel() {
411: super.cancel();
412: cancelQueue.add(this);
413: startupWorker();
414: wakeup();
415: }
416: }
417: }
|