001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.io.IOException;
023: import java.nio.channels.SelectionKey;
024: import java.util.ArrayList;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Map;
028: import java.util.Queue;
029: import java.util.concurrent.ConcurrentLinkedQueue;
030: import java.util.concurrent.Executor;
031: import java.util.concurrent.atomic.AtomicInteger;
032:
033: import org.apache.mina.util.CopyOnWriteMap;
034: import org.apache.mina.util.NamePreservingRunnable;
035:
036: /**
037: * An abstract implementation of {@link IoProcessor} which helps
038: * transport developers to write an {@link IoProcessor} easily.
039: *
040: * @author The Apache MINA Project (dev@mina.apache.org)
041: * @version $Rev: 627803 $, $Date: 2008-02-14 10:03:14 -0700 (Thu, 14 Feb 2008) $
042: */
043: public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
044: implements IoProcessor<T> {
045: /**
046: * The maximum loop count for a write operation until
047: * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
048: * It is similar to what a spin lock is for in concurrency programming.
049: * It improves memory utilization and write throughput significantly.
050: */
051: private static final int WRITE_SPIN_COUNT = 256;
052:
053: private static final Map<Class<?>, AtomicInteger> threadIds = new CopyOnWriteMap<Class<?>, AtomicInteger>();
054:
055: private final Object lock = new Object();
056: private final String threadName;
057: private final Executor executor;
058:
059: private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
060: private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
061: private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
062: private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
063:
064: private Worker worker;
065: private long lastIdleCheckTime;
066:
067: private final Object disposalLock = new Object();
068: private volatile boolean disposing;
069: private volatile boolean disposed;
070: private final DefaultIoFuture disposalFuture = new DefaultIoFuture(
071: null);
072:
073: protected AbstractPollingIoProcessor(Executor executor) {
074: if (executor == null) {
075: throw new NullPointerException("executor");
076: }
077:
078: this .threadName = nextThreadName();
079: this .executor = executor;
080: }
081:
082: private String nextThreadName() {
083: Class<?> cls = getClass();
084: AtomicInteger threadId = threadIds.get(cls);
085: int newThreadId;
086: if (threadId == null) {
087: newThreadId = 1;
088: threadIds.put(cls, new AtomicInteger(newThreadId));
089: } else {
090: newThreadId = threadId.incrementAndGet();
091: }
092:
093: return cls.getSimpleName() + '-' + newThreadId;
094: }
095:
096: public final boolean isDisposing() {
097: return disposing;
098: }
099:
100: public final boolean isDisposed() {
101: return disposed;
102: }
103:
104: public final void dispose() {
105: if (disposed) {
106: return;
107: }
108:
109: synchronized (disposalLock) {
110: if (!disposing) {
111: disposing = true;
112: startupWorker();
113: }
114: }
115:
116: disposalFuture.awaitUninterruptibly();
117: disposed = true;
118: }
119:
120: protected abstract void dispose0() throws Exception;
121:
122: /**
123: * poll those sessions for the given timeout
124: * @param timeout milliseconds before the call timeout if no event appear
125: * @return true if at least a session is ready for read or for write
126: * @throws Exception if some low level IO error occurs
127: */
128: protected abstract boolean select(int timeout) throws Exception;
129:
130: protected abstract void wakeup();
131:
132: protected abstract Iterator<T> allSessions();
133:
134: protected abstract Iterator<T> selectedSessions();
135:
136: protected abstract SessionState state(T session);
137:
138: /**
139: * Is the session ready for writing
140: * @param session the session queried
141: * @return true is ready, false if not ready
142: */
143: protected abstract boolean isWritable(T session);
144:
145: /**
146: * Is the session ready for reading
147: * @param session the session queried
148: * @return true is ready, false if not ready
149: */
150: protected abstract boolean isReadable(T session);
151:
152: /**
153: * register a session for writing
154: * @param session the session registered
155: * @param interested true for registering, false for removing
156: */
157: protected abstract void setInterestedInWrite(T session,
158: boolean interested) throws Exception;
159:
160: /**
161: * register a session for reading
162: * @param session the session registered
163: * @param interested true for registering, false for removing
164: */
165: protected abstract void setInterestedInRead(T session,
166: boolean interested) throws Exception;
167:
168: /**
169: * is this session registered for reading
170: * @param session the session queried
171: * @return true is registered for reading
172: */
173: protected abstract boolean isInterestedInRead(T session);
174:
175: /**
176: * is this session registered for writing
177: * @param session the session queried
178: * @return true is registered for writing
179: */
180: protected abstract boolean isInterestedInWrite(T session);
181:
182: protected abstract void init(T session) throws Exception;
183:
184: protected abstract void destroy(T session) throws Exception;
185:
186: protected abstract int read(T session, IoBuffer buf)
187: throws Exception;
188:
189: protected abstract int write(T session, IoBuffer buf, int length)
190: throws Exception;
191:
192: protected abstract int transferFile(T session, FileRegion region,
193: int length) throws Exception;
194:
195: public final void add(T session) {
196: if (isDisposing()) {
197: throw new IllegalStateException("Already disposed.");
198: }
199:
200: newSessions.add(session);
201: startupWorker();
202: }
203:
204: public final void remove(T session) {
205: scheduleRemove(session);
206: startupWorker();
207: }
208:
209: private void scheduleRemove(T session) {
210: removingSessions.add(session);
211: }
212:
213: public final void flush(T session) {
214: boolean needsWakeup = flushingSessions.isEmpty();
215: if (scheduleFlush(session) && needsWakeup) {
216: wakeup();
217: }
218: }
219:
220: private boolean scheduleFlush(T session) {
221: if (session.setScheduledForFlush(true)) {
222: flushingSessions.add(session);
223: return true;
224: }
225: return false;
226: }
227:
228: public final void updateTrafficMask(T session) {
229: scheduleTrafficControl(session);
230: wakeup();
231: }
232:
233: private void scheduleTrafficControl(T session) {
234: trafficControllingSessions.add(session);
235: }
236:
237: private void startupWorker() {
238: synchronized (lock) {
239: if (worker == null) {
240: worker = new Worker();
241: executor.execute(new NamePreservingRunnable(worker,
242: threadName));
243: }
244: }
245: wakeup();
246: }
247:
248: private int add() {
249: int addedSessions = 0;
250: for (;;) {
251: T session = newSessions.poll();
252:
253: if (session == null) {
254: break;
255: }
256:
257: if (addNow(session)) {
258: addedSessions++;
259: }
260: }
261:
262: return addedSessions;
263: }
264:
265: private boolean addNow(T session) {
266:
267: boolean registered = false;
268: boolean notified = false;
269: try {
270: init(session);
271: registered = true;
272:
273: // Build the filter chain of this session.
274: session.getService().getFilterChainBuilder()
275: .buildFilterChain(session.getFilterChain());
276:
277: // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
278: // in AbstractIoFilterChain.fireSessionOpened().
279: ((AbstractIoService) session.getService()).getListeners()
280: .fireSessionCreated(session);
281: notified = true;
282: } catch (Throwable e) {
283: if (notified) {
284: // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
285: // and call ConnectFuture.setException().
286: scheduleRemove(session);
287: session.getFilterChain().fireExceptionCaught(e);
288: wakeup();
289: } else {
290: ExceptionMonitor.getInstance().exceptionCaught(e);
291: try {
292: destroy(session);
293: } catch (Exception e1) {
294: ExceptionMonitor.getInstance().exceptionCaught(e1);
295: } finally {
296: registered = false;
297: }
298: }
299: }
300: return registered;
301: }
302:
303: private int remove() {
304: int removedSessions = 0;
305: for (;;) {
306: T session = removingSessions.poll();
307:
308: if (session == null) {
309: break;
310: }
311:
312: SessionState state = state(session);
313: switch (state) {
314: case OPEN:
315: if (removeNow(session)) {
316: removedSessions++;
317: }
318: break;
319: case CLOSED:
320: // Skip if channel is already closed
321: break;
322: case PREPARING:
323: // Retry later if session is not yet fully initialized.
324: // (In case that Session.close() is called before addSession() is processed)
325: scheduleRemove(session);
326: return removedSessions;
327: default:
328: throw new IllegalStateException(String.valueOf(state));
329: }
330: }
331:
332: return removedSessions;
333: }
334:
335: private boolean removeNow(T session) {
336: clearWriteRequestQueue(session);
337:
338: try {
339: destroy(session);
340: return true;
341: } catch (Exception e) {
342: session.getFilterChain().fireExceptionCaught(e);
343: } finally {
344: clearWriteRequestQueue(session);
345: ((AbstractIoService) session.getService()).getListeners()
346: .fireSessionDestroyed(session);
347: }
348: return false;
349: }
350:
351: private void clearWriteRequestQueue(T session) {
352: WriteRequestQueue writeRequestQueue = session
353: .getWriteRequestQueue();
354: WriteRequest req;
355:
356: List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
357:
358: if ((req = writeRequestQueue.poll(session)) != null) {
359: Object m = req.getMessage();
360: if (m instanceof IoBuffer) {
361: IoBuffer buf = (IoBuffer) req.getMessage();
362:
363: // The first unwritten empty buffer must be
364: // forwarded to the filter chain.
365: if (buf.hasRemaining()) {
366: buf.reset();
367: failedRequests.add(req);
368: } else {
369: session.getFilterChain().fireMessageSent(req);
370: }
371: } else {
372: failedRequests.add(req);
373: }
374:
375: // Discard others.
376: while ((req = writeRequestQueue.poll(session)) != null) {
377: failedRequests.add(req);
378: }
379: }
380:
381: // Create an exception and notify.
382: if (!failedRequests.isEmpty()) {
383: WriteToClosedSessionException cause = new WriteToClosedSessionException(
384: failedRequests);
385: for (WriteRequest r : failedRequests) {
386: session.decreaseScheduledBytesAndMessages(r);
387: r.getFuture().setException(cause);
388: }
389: session.getFilterChain().fireExceptionCaught(cause);
390: }
391: }
392:
393: private void process() throws Exception {
394: for (Iterator<T> i = selectedSessions(); i.hasNext();) {
395: process(i.next());
396: i.remove();
397: }
398: }
399:
400: private void process(T session) {
401:
402: if (isReadable(session)
403: && session.getTrafficMask().isReadable()) {
404: read(session);
405: }
406:
407: if (isWritable(session)
408: && session.getTrafficMask().isWritable()) {
409: scheduleFlush(session);
410: }
411: }
412:
413: private void read(T session) {
414: IoSessionConfig config = session.getConfig();
415: IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
416:
417: final boolean hasFragmentation = session.getTransportMetadata()
418: .hasFragmentation();
419:
420: try {
421: int readBytes = 0;
422: int ret;
423:
424: try {
425: if (hasFragmentation) {
426: while ((ret = read(session, buf)) > 0) {
427: readBytes += ret;
428: if (!buf.hasRemaining()) {
429: break;
430: }
431: }
432: } else {
433: ret = read(session, buf);
434: if (ret > 0) {
435: readBytes = ret;
436: }
437: }
438: } finally {
439: buf.flip();
440: }
441:
442: if (readBytes > 0) {
443: session.getFilterChain().fireMessageReceived(buf);
444: buf = null;
445:
446: if (hasFragmentation) {
447: if (readBytes << 1 < config.getReadBufferSize()) {
448: session.decreaseReadBufferSize();
449: } else if (readBytes == config.getReadBufferSize()) {
450: session.increaseReadBufferSize();
451: }
452: }
453: }
454: if (ret < 0) {
455: scheduleRemove(session);
456: }
457: } catch (Throwable e) {
458: if (e instanceof IOException) {
459: scheduleRemove(session);
460: }
461: session.getFilterChain().fireExceptionCaught(e);
462: }
463: }
464:
465: private void notifyIdleSessions() throws Exception {
466: // process idle sessions
467: long currentTime = System.currentTimeMillis();
468: if (currentTime - lastIdleCheckTime >= 1000) {
469: lastIdleCheckTime = currentTime;
470: IdleStatusChecker
471: .notifyIdleness(allSessions(), currentTime);
472: }
473: }
474:
475: private void flush() {
476: for (;;) {
477: T session = flushingSessions.poll();
478:
479: if (session == null) {
480: break;
481: }
482:
483: session.setScheduledForFlush(false);
484: SessionState state = state(session);
485: switch (state) {
486: case OPEN:
487: try {
488: boolean flushedAll = flushNow(session);
489: if (flushedAll
490: && !session.getWriteRequestQueue().isEmpty(
491: session)
492: && !session.isScheduledForFlush()) {
493: scheduleFlush(session);
494: }
495: } catch (Exception e) {
496: scheduleRemove(session);
497: session.getFilterChain().fireExceptionCaught(e);
498: }
499: break;
500: case CLOSED:
501: // Skip if the channel is already closed.
502: break;
503: case PREPARING:
504: // Retry later if session is not yet fully initialized.
505: // (In case that Session.write() is called before addSession() is processed)
506: scheduleFlush(session);
507: return;
508: default:
509: throw new IllegalStateException(String.valueOf(state));
510: }
511: }
512: }
513:
514: private boolean flushNow(T session) {
515: if (!session.isConnected()) {
516: scheduleRemove(session);
517: return false;
518: }
519:
520: final boolean hasFragmentation = session.getTransportMetadata()
521: .hasFragmentation();
522:
523: try {
524: // Clear OP_WRITE
525: setInterestedInWrite(session, false);
526:
527: WriteRequestQueue writeRequestQueue = session
528: .getWriteRequestQueue();
529:
530: // Set limitation for the number of written bytes for read-write
531: // fairness. I used maxReadBufferSize * 3 / 2, which yields best
532: // performance in my experience while not breaking fairness much.
533: int maxWrittenBytes = session.getConfig()
534: .getMaxReadBufferSize()
535: + (session.getConfig().getMaxReadBufferSize() >>> 1);
536: int writtenBytes = 0;
537: do {
538: // Check for pending writes.
539: WriteRequest req = session.getCurrentWriteRequest();
540: if (req == null) {
541: req = writeRequestQueue.poll(session);
542: if (req == null) {
543: break;
544: }
545: session.setCurrentWriteRequest(req);
546: }
547:
548: int localWrittenBytes = 0;
549: Object message = req.getMessage();
550: if (message instanceof IoBuffer) {
551: localWrittenBytes = writeBuffer(session, req,
552: hasFragmentation, maxWrittenBytes
553: - writtenBytes);
554: } else if (message instanceof FileRegion) {
555: localWrittenBytes = writeFile(session, req,
556: hasFragmentation, maxWrittenBytes
557: - writtenBytes);
558: } else {
559: throw new IllegalStateException(
560: "Don't know how to handle message of type '"
561: + message.getClass().getName()
562: + "'. Are you missing a protocol encoder?");
563: }
564:
565: writtenBytes += localWrittenBytes;
566:
567: if (localWrittenBytes == 0
568: || writtenBytes >= maxWrittenBytes) {
569: // Kernel buffer is full or wrote too much.
570: setInterestedInWrite(session, true);
571: return false;
572: }
573: } while (writtenBytes < maxWrittenBytes);
574: } catch (Exception e) {
575: session.getFilterChain().fireExceptionCaught(e);
576: return false;
577: }
578:
579: return true;
580: }
581:
582: private int writeBuffer(T session, WriteRequest req,
583: boolean hasFragmentation, int maxLength) throws Exception {
584: IoBuffer buf = (IoBuffer) req.getMessage();
585: int localWrittenBytes = 0;
586: if (buf.hasRemaining()) {
587: int length;
588: if (hasFragmentation) {
589: length = Math.min(buf.remaining(), maxLength);
590: } else {
591: length = buf.remaining();
592: }
593: for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
594: localWrittenBytes = write(session, buf, length);
595: if (localWrittenBytes != 0) {
596: break;
597: }
598: }
599: }
600:
601: if (!buf.hasRemaining()
602: || (!hasFragmentation && localWrittenBytes != 0)) {
603: // Buffer has been sent, clear the current request.
604: buf.reset();
605: fireMessageSent(session, req);
606: }
607: return localWrittenBytes;
608: }
609:
610: private int writeFile(T session, WriteRequest req,
611: boolean hasFragmentation, int maxLength) throws Exception {
612: int localWrittenBytes;
613: FileRegion region = (FileRegion) req.getMessage();
614: if (region.getCount() > 0) {
615: int length;
616: if (hasFragmentation) {
617: length = (int) Math.min(region.getCount(), maxLength);
618: } else {
619: length = (int) Math.min(Integer.MAX_VALUE, region
620: .getCount());
621: }
622: localWrittenBytes = transferFile(session, region, length);
623: region
624: .setPosition(region.getPosition()
625: + localWrittenBytes);
626:
627: // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
628: // If there's still data to be written in the FileRegion, return 0 indicating that we need
629: // to pause until writing may resume.
630: if (localWrittenBytes > 0 && region.getCount() > 0) {
631: return 0;
632: }
633: } else {
634: localWrittenBytes = 0;
635: }
636:
637: if (region.getCount() <= 0
638: || (!hasFragmentation && localWrittenBytes != 0)) {
639: fireMessageSent(session, req);
640: }
641: return localWrittenBytes;
642: }
643:
644: private void fireMessageSent(T session, WriteRequest req) {
645: session.setCurrentWriteRequest(null);
646: session.getFilterChain().fireMessageSent(req);
647: }
648:
649: private void updateTrafficMask() {
650: for (;;) {
651: T session = trafficControllingSessions.poll();
652:
653: if (session == null) {
654: break;
655: }
656:
657: SessionState state = state(session);
658: switch (state) {
659: case OPEN:
660: updateTrafficMaskNow(session);
661: break;
662: case CLOSED:
663: break;
664: case PREPARING:
665: // Retry later if session is not yet fully initialized.
666: // (In case that Session.suspend??() or session.resume??() is
667: // called before addSession() is processed)
668: scheduleTrafficControl(session);
669: return;
670: default:
671: throw new IllegalStateException(String.valueOf(state));
672: }
673: }
674: }
675:
676: private void updateTrafficMaskNow(T session) {
677: // The normal is OP_READ and, if there are write requests in the
678: // session's write queue, set OP_WRITE to trigger flushing.
679: int mask = session.getTrafficMask().getInterestOps();
680: try {
681: setInterestedInRead(session,
682: (mask & SelectionKey.OP_READ) != 0);
683: } catch (Exception e) {
684: session.getFilterChain().fireExceptionCaught(e);
685: }
686: try {
687: setInterestedInWrite(session, !session
688: .getWriteRequestQueue().isEmpty(session)
689: && (mask & SelectionKey.OP_WRITE) != 0);
690: } catch (Exception e) {
691: session.getFilterChain().fireExceptionCaught(e);
692: }
693: }
694:
695: private class Worker implements Runnable {
696: public void run() {
697: int nSessions = 0;
698: lastIdleCheckTime = System.currentTimeMillis();
699:
700: for (;;) {
701: try {
702: boolean selected = select(1000);
703:
704: nSessions += add();
705: updateTrafficMask();
706:
707: if (selected) {
708: process();
709: }
710:
711: flush();
712: nSessions -= remove();
713: notifyIdleSessions();
714:
715: if (nSessions == 0) {
716: synchronized (lock) {
717: if (newSessions.isEmpty()) {
718: worker = null;
719: break;
720: }
721: }
722: }
723:
724: // Disconnect all sessions immediately if disposal has been
725: // requested so that we exit this loop eventually.
726: if (isDisposing()) {
727: for (Iterator<T> i = allSessions(); i.hasNext();) {
728: scheduleRemove(i.next());
729: }
730: wakeup();
731: }
732: } catch (Throwable t) {
733: ExceptionMonitor.getInstance().exceptionCaught(t);
734:
735: try {
736: Thread.sleep(1000);
737: } catch (InterruptedException e1) {
738: ExceptionMonitor.getInstance().exceptionCaught(
739: e1);
740: }
741: }
742: }
743:
744: if (isDisposing()) {
745: try {
746: dispose0();
747: } catch (Throwable t) {
748: ExceptionMonitor.getInstance().exceptionCaught(t);
749: } finally {
750: disposalFuture.setValue(true);
751: }
752: }
753: }
754: }
755:
756: protected static enum SessionState {
757: OPEN, CLOSED, PREPARING,
758: }
759: }
|