001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.net.SocketAddress;
023: import java.util.Collections;
024: import java.util.HashMap;
025: import java.util.HashSet;
026: import java.util.Iterator;
027: import java.util.List;
028: import java.util.Map;
029: import java.util.Queue;
030: import java.util.Set;
031: import java.util.concurrent.ConcurrentLinkedQueue;
032: import java.util.concurrent.Executor;
033: import java.util.concurrent.ExecutorService;
034: import java.util.concurrent.LinkedBlockingQueue;
035: import java.util.concurrent.RejectedExecutionException;
036: import java.util.concurrent.ThreadPoolExecutor;
037: import java.util.concurrent.TimeUnit;
038: import java.util.concurrent.atomic.AtomicInteger;
039:
040: import org.apache.mina.util.NamePreservingRunnable;
041:
042: /**
043: * {@link IoAcceptor} for datagram transport (UDP/IP).
044: *
045: * @author The Apache MINA Project (dev@mina.apache.org)
046: * @version $Rev: 605069 $, $Date: 2007-12-17 19:47:03 -0700 (Mon, 17 Dec 2007) $
047: */
048: public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
049: extends AbstractIoAcceptor {
050:
051: private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
052:
053: private static final AtomicInteger id = new AtomicInteger();
054:
055: private final Object lock = new Object();
056: private final Executor executor;
057: private final boolean createdExecutor;
058: private final String threadName;
059: private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
060: private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
061: private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
062: private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
063: private final Map<SocketAddress, H> boundHandles = Collections
064: .synchronizedMap(new HashMap<SocketAddress, H>());
065:
066: private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
067:
068: private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
069: private volatile boolean selectable;
070: private Worker worker;
071: private long lastIdleCheckTime;
072:
073: /**
074: * Creates a new instance.
075: */
076: protected AbstractPollingConnectionlessIoAcceptor(
077: IoSessionConfig sessionConfig) {
078: this (sessionConfig, null);
079: }
080:
081: /**
082: * Creates a new instance.
083: */
084: protected AbstractPollingConnectionlessIoAcceptor(
085: IoSessionConfig sessionConfig, Executor executor) {
086: super (sessionConfig);
087:
088: threadName = getClass().getSimpleName() + '-'
089: + id.incrementAndGet();
090:
091: if (executor == null) {
092: this .executor = new ThreadPoolExecutor(1, 1, 1L,
093: TimeUnit.SECONDS,
094: new LinkedBlockingQueue<Runnable>());
095: this .createdExecutor = true;
096: } else {
097: this .executor = executor;
098: this .createdExecutor = false;
099: }
100:
101: try {
102: init();
103: selectable = true;
104: } catch (RuntimeException e) {
105: throw e;
106: } catch (Exception e) {
107: throw new RuntimeIoException("Failed to initialize.", e);
108: } finally {
109: if (!selectable) {
110: try {
111: destroy();
112: } catch (Exception e) {
113: ExceptionMonitor.getInstance().exceptionCaught(e);
114: }
115: }
116: }
117: }
118:
119: protected abstract void init() throws Exception;
120:
121: protected abstract void destroy() throws Exception;
122:
123: protected abstract boolean select(int timeout) throws Exception;
124:
125: protected abstract void wakeup();
126:
127: protected abstract Iterator<H> selectedHandles();
128:
129: protected abstract H open(SocketAddress localAddress)
130: throws Exception;
131:
132: protected abstract void close(H handle) throws Exception;
133:
134: protected abstract SocketAddress localAddress(H handle)
135: throws Exception;
136:
137: protected abstract boolean isReadable(H handle);
138:
139: protected abstract boolean isWritable(H handle);
140:
141: protected abstract SocketAddress receive(H handle, IoBuffer buffer)
142: throws Exception;
143:
144: protected abstract int send(T session, IoBuffer buffer,
145: SocketAddress remoteAddress) throws Exception;
146:
147: protected abstract T newSession(IoProcessor<T> processor, H handle,
148: SocketAddress remoteAddress) throws Exception;
149:
150: protected abstract void setInterestedInWrite(T session,
151: boolean interested) throws Exception;
152:
153: @Override
154: protected IoFuture dispose0() throws Exception {
155: unbind();
156: if (!disposalFuture.isDone()) {
157: try {
158: startupWorker();
159: wakeup();
160: } catch (RejectedExecutionException e) {
161: if (createdExecutor) {
162: // Ignore.
163: } else {
164: throw e;
165: }
166: }
167: }
168: return disposalFuture;
169: }
170:
171: @Override
172: protected final Set<SocketAddress> bind0(
173: List<? extends SocketAddress> localAddresses)
174: throws Exception {
175: AcceptorOperationFuture request = new AcceptorOperationFuture(
176: localAddresses);
177:
178: registerQueue.add(request);
179: startupWorker();
180: wakeup();
181:
182: request.awaitUninterruptibly();
183:
184: if (request.getException() != null) {
185: throw request.getException();
186: }
187:
188: Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
189: for (H handle : boundHandles.values()) {
190: newLocalAddresses.add(localAddress(handle));
191: }
192: return newLocalAddresses;
193: }
194:
195: @Override
196: protected final void unbind0(
197: List<? extends SocketAddress> localAddresses)
198: throws Exception {
199: AcceptorOperationFuture request = new AcceptorOperationFuture(
200: localAddresses);
201:
202: cancelQueue.add(request);
203: startupWorker();
204: wakeup();
205:
206: request.awaitUninterruptibly();
207:
208: if (request.getException() != null) {
209: throw request.getException();
210: }
211: }
212:
213: public final IoSession newSession(SocketAddress remoteAddress,
214: SocketAddress localAddress) {
215: if (isDisposing()) {
216: throw new IllegalStateException("Already disposed.");
217: }
218:
219: if (remoteAddress == null) {
220: throw new NullPointerException("remoteAddress");
221: }
222:
223: synchronized (bindLock) {
224: if (!isActive()) {
225: throw new IllegalStateException(
226: "Can't create a session from a unbound service.");
227: }
228:
229: try {
230: return newSessionWithoutLock(remoteAddress,
231: localAddress);
232: } catch (RuntimeException e) {
233: throw e;
234: } catch (Error e) {
235: throw e;
236: } catch (Exception e) {
237: throw new RuntimeIoException(
238: "Failed to create a session.", e);
239: }
240: }
241: }
242:
243: private IoSession newSessionWithoutLock(
244: SocketAddress remoteAddress, SocketAddress localAddress)
245: throws Exception {
246: H handle = boundHandles.get(localAddress);
247: if (handle == null) {
248: throw new IllegalArgumentException(
249: "Unknown local address: " + localAddress);
250: }
251:
252: IoSession session;
253: IoSessionRecycler sessionRecycler = getSessionRecycler();
254: synchronized (sessionRecycler) {
255: session = sessionRecycler.recycle(localAddress,
256: remoteAddress);
257: if (session != null) {
258: return session;
259: }
260:
261: // If a new session needs to be created.
262: T newSession = newSession(processor, handle, remoteAddress);
263: getSessionRecycler().put(newSession);
264: session = newSession;
265: }
266:
267: finishSessionInitialization(session, null, null);
268:
269: try {
270: this .getFilterChainBuilder().buildFilterChain(
271: session.getFilterChain());
272: getListeners().fireSessionCreated(session);
273: } catch (Throwable t) {
274: ExceptionMonitor.getInstance().exceptionCaught(t);
275: }
276:
277: return session;
278: }
279:
280: public final IoSessionRecycler getSessionRecycler() {
281: return sessionRecycler;
282: }
283:
284: public final void setSessionRecycler(
285: IoSessionRecycler sessionRecycler) {
286: synchronized (bindLock) {
287: if (isActive()) {
288: throw new IllegalStateException(
289: "sessionRecycler can't be set while the acceptor is bound.");
290: }
291:
292: if (sessionRecycler == null) {
293: sessionRecycler = DEFAULT_RECYCLER;
294: }
295: this .sessionRecycler = sessionRecycler;
296: }
297: }
298:
299: private class ConnectionlessAcceptorProcessor implements
300: IoProcessor<T> {
301:
302: public void add(T session) {
303: }
304:
305: public void flush(T session) {
306: if (scheduleFlush(session)) {
307: wakeup();
308: }
309: }
310:
311: public void remove(T session) {
312: getSessionRecycler().remove(session);
313: getListeners().fireSessionDestroyed(session);
314: }
315:
316: public void updateTrafficMask(T session) {
317: throw new UnsupportedOperationException();
318: }
319:
320: public void dispose() {
321: }
322:
323: public boolean isDisposed() {
324: return false;
325: }
326:
327: public boolean isDisposing() {
328: return false;
329: }
330: }
331:
332: private void startupWorker() {
333: if (!selectable) {
334: registerQueue.clear();
335: cancelQueue.clear();
336: flushingSessions.clear();
337: }
338:
339: synchronized (lock) {
340: if (worker == null) {
341: worker = new Worker();
342: executor.execute(new NamePreservingRunnable(worker,
343: threadName));
344: }
345: }
346: }
347:
348: private boolean scheduleFlush(T session) {
349: if (session.setScheduledForFlush(true)) {
350: flushingSessions.add(session);
351: return true;
352: } else {
353: return false;
354: }
355: }
356:
357: private class Worker implements Runnable {
358: public void run() {
359: int nHandles = 0;
360: lastIdleCheckTime = System.currentTimeMillis();
361:
362: while (selectable) {
363: try {
364: boolean selected = select(1000);
365:
366: nHandles += registerHandles();
367:
368: if (selected) {
369: processReadySessions(selectedHandles());
370: }
371:
372: flushSessions();
373: nHandles -= unregisterHandles();
374:
375: notifyIdleSessions();
376:
377: if (nHandles == 0) {
378: synchronized (lock) {
379: if (registerQueue.isEmpty()
380: && cancelQueue.isEmpty()) {
381: worker = null;
382: break;
383: }
384: }
385: }
386: } catch (Exception e) {
387: ExceptionMonitor.getInstance().exceptionCaught(e);
388:
389: try {
390: Thread.sleep(1000);
391: } catch (InterruptedException e1) {
392: }
393: }
394: }
395:
396: if (selectable && isDisposing()) {
397: selectable = false;
398: try {
399: destroy();
400: } catch (Exception e) {
401: ExceptionMonitor.getInstance().exceptionCaught(e);
402: } finally {
403: disposalFuture.setValue(true);
404: if (createdExecutor) {
405: ((ExecutorService) executor).shutdown();
406: }
407: }
408: }
409: }
410: }
411:
412: @SuppressWarnings("unchecked")
413: private void processReadySessions(Iterator<H> handles) {
414: while (handles.hasNext()) {
415: H h = handles.next();
416: handles.remove();
417: try {
418: if (isReadable(h)) {
419: readHandle(h);
420: }
421:
422: if (isWritable(h)) {
423: for (IoSession session : getManagedSessions()) {
424: scheduleFlush((T) session);
425: }
426: }
427: } catch (Throwable t) {
428: ExceptionMonitor.getInstance().exceptionCaught(t);
429: }
430: }
431: }
432:
433: private void readHandle(H handle) throws Exception {
434: IoBuffer readBuf = IoBuffer.allocate(getSessionConfig()
435: .getReadBufferSize());
436:
437: SocketAddress remoteAddress = receive(handle, readBuf);
438: if (remoteAddress != null) {
439: IoSession session = newSessionWithoutLock(remoteAddress,
440: localAddress(handle));
441:
442: readBuf.flip();
443:
444: IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
445: newBuf.put(readBuf);
446: newBuf.flip();
447:
448: session.getFilterChain().fireMessageReceived(newBuf);
449: }
450: }
451:
452: private void flushSessions() {
453: for (;;) {
454: T session = flushingSessions.poll();
455: if (session == null) {
456: break;
457: }
458:
459: session.setScheduledForFlush(false);
460:
461: try {
462: boolean flushedAll = flush(session);
463: if (flushedAll
464: && !session.getWriteRequestQueue().isEmpty(
465: session)
466: && !session.isScheduledForFlush()) {
467: scheduleFlush(session);
468: }
469: } catch (Exception e) {
470: session.getFilterChain().fireExceptionCaught(e);
471: }
472: }
473: }
474:
475: private boolean flush(T session) throws Exception {
476: // Clear OP_WRITE
477: setInterestedInWrite(session, false);
478:
479: WriteRequestQueue writeRequestQueue = session
480: .getWriteRequestQueue();
481:
482: int maxWrittenBytes = session.getConfig()
483: .getMaxReadBufferSize()
484: + (session.getConfig().getMaxReadBufferSize() >>> 1);
485:
486: int writtenBytes = 0;
487: for (;;) {
488: WriteRequest req = session.getCurrentWriteRequest();
489: if (req == null) {
490: req = writeRequestQueue.poll(session);
491: if (req == null) {
492: break;
493: }
494: session.setCurrentWriteRequest(req);
495: }
496:
497: IoBuffer buf = (IoBuffer) req.getMessage();
498: if (buf.remaining() == 0) {
499: // Clear and fire event
500: session.setCurrentWriteRequest(null);
501: buf.reset();
502: session.getFilterChain().fireMessageSent(req);
503: continue;
504: }
505:
506: SocketAddress destination = req.getDestination();
507: if (destination == null) {
508: destination = session.getRemoteAddress();
509: }
510:
511: int localWrittenBytes = send(session, buf, destination);
512: if (localWrittenBytes == 0
513: || writtenBytes >= maxWrittenBytes) {
514: // Kernel buffer is full or wrote too much
515: setInterestedInWrite(session, true);
516: return false;
517: } else {
518: setInterestedInWrite(session, false);
519:
520: // Clear and fire event
521: session.setCurrentWriteRequest(null);
522: writtenBytes += localWrittenBytes;
523: buf.reset();
524: session.getFilterChain().fireMessageSent(req);
525: }
526: }
527:
528: return true;
529: }
530:
531: private int registerHandles() {
532: for (;;) {
533: AcceptorOperationFuture req = registerQueue.poll();
534: if (req == null) {
535: break;
536: }
537:
538: Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
539: List<SocketAddress> localAddresses = req
540: .getLocalAddresses();
541: try {
542: for (SocketAddress a : localAddresses) {
543: H handle = open(a);
544: newHandles.put(localAddress(handle), handle);
545: }
546: boundHandles.putAll(newHandles);
547:
548: getListeners().fireServiceActivated();
549: req.setDone();
550: return newHandles.size();
551: } catch (Exception e) {
552: req.setException(e);
553: } finally {
554: // Roll back if failed to bind all addresses.
555: if (req.getException() != null) {
556: for (H handle : newHandles.values()) {
557: try {
558: close(handle);
559: } catch (Exception e) {
560: ExceptionMonitor.getInstance()
561: .exceptionCaught(e);
562: }
563: }
564: wakeup();
565: }
566: }
567: }
568:
569: return 0;
570: }
571:
572: private int unregisterHandles() {
573: int nHandles = 0;
574: for (;;) {
575: AcceptorOperationFuture request = cancelQueue.poll();
576: if (request == null) {
577: break;
578: }
579:
580: // close the channels
581: for (SocketAddress a : request.getLocalAddresses()) {
582: H handle = boundHandles.remove(a);
583: if (handle == null) {
584: continue;
585: }
586:
587: try {
588: close(handle);
589: wakeup(); // wake up again to trigger thread death
590: } catch (Throwable e) {
591: ExceptionMonitor.getInstance().exceptionCaught(e);
592: } finally {
593: nHandles++;
594: }
595: }
596:
597: request.setDone();
598: }
599:
600: return nHandles;
601: }
602:
603: private void notifyIdleSessions() {
604: // process idle sessions
605: long currentTime = System.currentTimeMillis();
606: if (currentTime - lastIdleCheckTime >= 1000) {
607: lastIdleCheckTime = currentTime;
608: IdleStatusChecker.notifyIdleness(getListeners()
609: .getManagedSessions().iterator(), currentTime);
610: }
611: }
612: }
|