001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.io.File;
023: import java.io.FileInputStream;
024: import java.io.IOException;
025: import java.net.SocketAddress;
026: import java.nio.channels.FileChannel;
027: import java.util.Queue;
028: import java.util.Set;
029: import java.util.concurrent.atomic.AtomicBoolean;
030: import java.util.concurrent.atomic.AtomicInteger;
031: import java.util.concurrent.atomic.AtomicLong;
032:
033: import org.apache.mina.util.CircularQueue;
034:
035: /**
036: * Base implementation of {@link IoSession}.
037: *
038: * @author The Apache MINA Project (dev@mina.apache.org)
039: * @version $Rev: 627806 $, $Date: 2008-02-14 10:17:15 -0700 (Thu, 14 Feb 2008) $
040: */
041: public abstract class AbstractIoSession implements IoSession {
042:
043: private static final AttributeKey READY_READ_FUTURES = new AttributeKey(
044: AbstractIoSession.class, "readyReadFutures");
045: private static final AttributeKey WAITING_READ_FUTURES = new AttributeKey(
046: AbstractIoSession.class, "waitingReadFutures");
047:
048: private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
049: public void operationComplete(CloseFuture future) {
050: AbstractIoSession s = (AbstractIoSession) future
051: .getSession();
052: s.scheduledWriteBytes.set(0);
053: s.scheduledWriteMessages.set(0);
054: s.readBytesThroughput = 0;
055: s.readMessagesThroughput = 0;
056: s.writtenBytesThroughput = 0;
057: s.writtenMessagesThroughput = 0;
058: }
059: };
060:
061: /**
062: * An internal write request object that triggers session close.
063: * @see #writeRequestQueue
064: */
065: private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(
066: new Object());
067:
068: private final Object lock = new Object();
069:
070: private IoSessionAttributeMap attributes;
071: private WriteRequestQueue writeRequestQueue;
072: private WriteRequest currentWriteRequest;
073: private final long creationTime;
074:
075: /**
076: * A future that will be set 'closed' when the connection is closed.
077: */
078: private final CloseFuture closeFuture = new DefaultCloseFuture(this );
079:
080: private volatile boolean closing;
081: private volatile TrafficMask trafficMask = TrafficMask.ALL;
082:
083: // Status variables
084: private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
085: private final AtomicLong scheduledWriteBytes = new AtomicLong();
086: private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
087:
088: private long readBytes;
089: private long writtenBytes;
090: private long readMessages;
091: private long writtenMessages;
092: private long lastReadTime;
093: private long lastWriteTime;
094:
095: private long lastThroughputCalculationTime;
096: private long lastReadBytes;
097: private long lastWrittenBytes;
098: private long lastReadMessages;
099: private long lastWrittenMessages;
100: private double readBytesThroughput;
101: private double writtenBytesThroughput;
102: private double readMessagesThroughput;
103: private double writtenMessagesThroughput;
104:
105: private int idleCountForBoth;
106: private int idleCountForRead;
107: private int idleCountForWrite;
108:
109: private long lastIdleTimeForBoth;
110: private long lastIdleTimeForRead;
111: private long lastIdleTimeForWrite;
112:
113: private boolean deferDecreaseReadBuffer = true;
114:
115: protected AbstractIoSession() {
116: creationTime = lastThroughputCalculationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
117: .currentTimeMillis();
118: closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
119: }
120:
121: public final long getId() {
122: return hashCode() & 0xFFFFFFFFL;
123: }
124:
125: @SuppressWarnings("unchecked")
126: protected abstract IoProcessor getProcessor();
127:
128: public final boolean isConnected() {
129: return !closeFuture.isClosed();
130: }
131:
132: public final boolean isClosing() {
133: return closing || closeFuture.isClosed();
134: }
135:
136: public final CloseFuture getCloseFuture() {
137: return closeFuture;
138: }
139:
140: protected final boolean isScheduledForFlush() {
141: return scheduledForFlush.get();
142: }
143:
144: protected final boolean setScheduledForFlush(boolean flag) {
145: if (flag) {
146: return scheduledForFlush.compareAndSet(false, true);
147: } else {
148: scheduledForFlush.set(false);
149: return true;
150: }
151: }
152:
153: public final CloseFuture close(boolean rightNow) {
154: if (rightNow) {
155: return close();
156: } else {
157: return closeOnFlush();
158: }
159: }
160:
161: public final CloseFuture close() {
162: synchronized (lock) {
163: if (isClosing()) {
164: return closeFuture;
165: } else {
166: closing = true;
167: }
168: }
169:
170: getFilterChain().fireFilterClose();
171: return closeFuture;
172: }
173:
174: @SuppressWarnings("unchecked")
175: public final CloseFuture closeOnFlush() {
176: getWriteRequestQueue().offer(this , CLOSE_REQUEST);
177: getProcessor().flush(this );
178: return closeFuture;
179: }
180:
181: public final ReadFuture read() {
182: if (!getConfig().isUseReadOperation()) {
183: throw new IllegalStateException(
184: "useReadOperation is not enabled.");
185: }
186:
187: Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
188: ReadFuture future;
189: synchronized (readyReadFutures) {
190: future = readyReadFutures.poll();
191: if (future != null) {
192: if (future.isClosed()) {
193: // Let other readers get notified.
194: readyReadFutures.offer(future);
195: }
196: } else {
197: future = new DefaultReadFuture(this );
198: getWaitingReadFutures().offer(future);
199: }
200: }
201:
202: return future;
203: }
204:
205: protected final void offerReadFuture(Object message) {
206: newReadFuture().setRead(message);
207: }
208:
209: protected final void offerFailedReadFuture(Throwable exception) {
210: newReadFuture().setException(exception);
211: }
212:
213: protected final void offerClosedReadFuture() {
214: Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
215: synchronized (readyReadFutures) {
216: newReadFuture().setClosed();
217: }
218: }
219:
220: private ReadFuture newReadFuture() {
221: Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
222: Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
223: ReadFuture future;
224: synchronized (readyReadFutures) {
225: future = waitingReadFutures.poll();
226: if (future == null) {
227: future = new DefaultReadFuture(this );
228: readyReadFutures.offer(future);
229: }
230: }
231: return future;
232: }
233:
234: @SuppressWarnings("unchecked")
235: private Queue<ReadFuture> getReadyReadFutures() {
236: Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES);
237: if (readyReadFutures == null) {
238: readyReadFutures = new CircularQueue<ReadFuture>();
239:
240: Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
241: READY_READ_FUTURES, readyReadFutures);
242: if (oldReadyReadFutures != null) {
243: readyReadFutures = oldReadyReadFutures;
244: }
245:
246: // Initialize waitingReadFutures together.
247: Queue<ReadFuture> waitingReadFutures = new CircularQueue<ReadFuture>();
248: setAttributeIfAbsent(WAITING_READ_FUTURES,
249: waitingReadFutures);
250: }
251: return readyReadFutures;
252: }
253:
254: @SuppressWarnings("unchecked")
255: private Queue<ReadFuture> getWaitingReadFutures() {
256: return (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES);
257: }
258:
259: public final WriteFuture write(Object message) {
260: return write(message, null);
261: }
262:
263: public final WriteFuture write(Object message,
264: SocketAddress remoteAddress) {
265: if (message == null) {
266: throw new NullPointerException("message");
267: }
268:
269: if (!getTransportMetadata().isConnectionless()
270: && remoteAddress != null) {
271: throw new UnsupportedOperationException();
272: }
273:
274: if (isClosing() || !isConnected()) {
275: WriteFuture future = new DefaultWriteFuture(this );
276: WriteRequest request = new DefaultWriteRequest(message,
277: future, remoteAddress);
278: future.setException(new WriteToClosedSessionException(
279: request));
280: return future;
281: }
282:
283: FileChannel openedFileChannel = null;
284: try {
285: if (message instanceof IoBuffer
286: && !((IoBuffer) message).hasRemaining()) {
287: throw new IllegalArgumentException(
288: "message is empty. Forgot to call flip()?");
289: } else if (message instanceof FileChannel) {
290: FileChannel fileChannel = (FileChannel) message;
291: message = new DefaultFileRegion(fileChannel, 0,
292: fileChannel.size());
293: } else if (message instanceof File) {
294: File file = (File) message;
295: openedFileChannel = new FileInputStream(file)
296: .getChannel();
297: message = new DefaultFileRegion(openedFileChannel, 0,
298: openedFileChannel.size());
299: }
300: } catch (IOException e) {
301: ExceptionMonitor.getInstance().exceptionCaught(e);
302: return DefaultWriteFuture.newNotWrittenFuture(this , e);
303: }
304:
305: WriteFuture future = new DefaultWriteFuture(this );
306: getFilterChain()
307: .fireFilterWrite(
308: new DefaultWriteRequest(message, future,
309: remoteAddress));
310:
311: if (openedFileChannel != null) {
312: // If we opened a FileChannel, it needs to be closed when the write has completed
313: final FileChannel finalChannel = openedFileChannel;
314: future.addListener(new IoFutureListener<WriteFuture>() {
315: public void operationComplete(WriteFuture future) {
316: try {
317: finalChannel.close();
318: } catch (IOException e) {
319: ExceptionMonitor.getInstance().exceptionCaught(
320: e);
321: }
322: }
323: });
324: }
325:
326: return future;
327: }
328:
329: public final Object getAttachment() {
330: return getAttribute("");
331: }
332:
333: public final Object setAttachment(Object attachment) {
334: return setAttribute("", attachment);
335: }
336:
337: public final Object getAttribute(Object key) {
338: return getAttribute(key, null);
339: }
340:
341: public final Object getAttribute(Object key, Object defaultValue) {
342: return attributes.getAttribute(this , key, defaultValue);
343: }
344:
345: public final Object setAttribute(Object key, Object value) {
346: return attributes.setAttribute(this , key, value);
347: }
348:
349: public final Object setAttribute(Object key) {
350: return setAttribute(key, Boolean.TRUE);
351: }
352:
353: public final Object setAttributeIfAbsent(Object key, Object value) {
354: return attributes.setAttributeIfAbsent(this , key, value);
355: }
356:
357: public final Object setAttributeIfAbsent(Object key) {
358: return setAttributeIfAbsent(key, Boolean.TRUE);
359: }
360:
361: public final Object removeAttribute(Object key) {
362: return attributes.removeAttribute(this , key);
363: }
364:
365: public final boolean removeAttribute(Object key, Object value) {
366: return attributes.removeAttribute(this , key, value);
367: }
368:
369: public final boolean replaceAttribute(Object key, Object oldValue,
370: Object newValue) {
371: return attributes.replaceAttribute(this , key, oldValue,
372: newValue);
373: }
374:
375: public final boolean containsAttribute(Object key) {
376: return attributes.containsAttribute(this , key);
377: }
378:
379: public final Set<Object> getAttributeKeys() {
380: return attributes.getAttributeKeys(this );
381: }
382:
383: protected final IoSessionAttributeMap getAttributeMap() {
384: return attributes;
385: }
386:
387: protected final void setAttributeMap(
388: IoSessionAttributeMap attributes) {
389: this .attributes = attributes;
390: }
391:
392: protected final void setWriteRequestQueue(
393: WriteRequestQueue writeRequestQueue) {
394: this .writeRequestQueue = new CloseRequestAwareWriteRequestQueue(
395: writeRequestQueue);
396: }
397:
398: public final TrafficMask getTrafficMask() {
399: return trafficMask;
400: }
401:
402: public final void setTrafficMask(TrafficMask trafficMask) {
403: if (trafficMask == null) {
404: throw new NullPointerException("trafficMask");
405: }
406:
407: if (isClosing() || !isConnected()) {
408: return;
409: }
410:
411: getFilterChain().fireFilterSetTrafficMask(trafficMask);
412: }
413:
414: protected final void setTrafficMaskNow(TrafficMask trafficMask) {
415: this .trafficMask = trafficMask;
416: }
417:
418: public final void suspendRead() {
419: setTrafficMask(getTrafficMask().and(TrafficMask.READ.not()));
420: }
421:
422: public final void suspendWrite() {
423: setTrafficMask(getTrafficMask().and(TrafficMask.WRITE.not()));
424: }
425:
426: public final void resumeRead() {
427: setTrafficMask(getTrafficMask().or(TrafficMask.READ));
428: }
429:
430: public final void resumeWrite() {
431: setTrafficMask(getTrafficMask().or(TrafficMask.WRITE));
432: }
433:
434: public final long getReadBytes() {
435: return readBytes;
436: }
437:
438: public final long getWrittenBytes() {
439: return writtenBytes;
440: }
441:
442: public final long getReadMessages() {
443: return readMessages;
444: }
445:
446: public final long getWrittenMessages() {
447: return writtenMessages;
448: }
449:
450: public final double getReadBytesThroughput() {
451: return readBytesThroughput;
452: }
453:
454: public final double getWrittenBytesThroughput() {
455: return writtenBytesThroughput;
456: }
457:
458: public final double getReadMessagesThroughput() {
459: return readMessagesThroughput;
460: }
461:
462: public final double getWrittenMessagesThroughput() {
463: return writtenMessagesThroughput;
464: }
465:
466: /**
467: * Update all statistical properties related with throughput assuming
468: * the specified time is the current time. By default this method returns
469: * silently without updating the throughput properties if they were
470: * calculated already within last
471: * {@link IoSessionConfig#getThroughputCalculationInterval() calculation interval}.
472: * If, however, <tt>force</tt> is specified as <tt>true</tt>, this method
473: * updates the throughput properties immediately.
474:
475: * @param currentTime the current time in milliseconds
476: */
477: protected final void updateThroughput(long currentTime,
478: boolean force) {
479: int interval = (int) (currentTime - lastThroughputCalculationTime);
480:
481: long minInterval = getConfig()
482: .getThroughputCalculationIntervalInMillis();
483: if (minInterval == 0 || interval < minInterval) {
484: if (!force) {
485: return;
486: }
487: }
488:
489: readBytesThroughput = (readBytes - lastReadBytes) * 1000.0
490: / interval;
491: writtenBytesThroughput = (writtenBytes - lastWrittenBytes)
492: * 1000.0 / interval;
493: readMessagesThroughput = (readMessages - lastReadMessages)
494: * 1000.0 / interval;
495: writtenMessagesThroughput = (writtenMessages - lastWrittenMessages)
496: * 1000.0 / interval;
497:
498: lastReadBytes = readBytes;
499: lastWrittenBytes = writtenBytes;
500: lastReadMessages = readMessages;
501: lastWrittenMessages = writtenMessages;
502:
503: lastThroughputCalculationTime = currentTime;
504: }
505:
506: public final long getScheduledWriteBytes() {
507: return scheduledWriteBytes.get();
508: }
509:
510: public final int getScheduledWriteMessages() {
511: return scheduledWriteMessages.get();
512: }
513:
514: protected void setScheduledWriteBytes(long byteCount) {
515: scheduledWriteBytes.set(byteCount);
516: }
517:
518: protected void setScheduledWriteMessages(int messages) {
519: scheduledWriteMessages.set(messages);
520: }
521:
522: protected final void increaseReadBytes(long increment,
523: long currentTime) {
524: if (increment <= 0) {
525: return;
526: }
527:
528: readBytes += increment;
529: lastReadTime = currentTime;
530: idleCountForBoth = 0;
531: idleCountForRead = 0;
532:
533: if (getService() instanceof AbstractIoService) {
534: ((AbstractIoService) getService()).increaseReadBytes(
535: increment, currentTime);
536: }
537: }
538:
539: protected final void increaseReadMessages(long currentTime) {
540: readMessages++;
541: lastReadTime = currentTime;
542: idleCountForBoth = 0;
543: idleCountForRead = 0;
544:
545: if (getService() instanceof AbstractIoService) {
546: ((AbstractIoService) getService())
547: .increaseReadMessages(currentTime);
548: }
549: }
550:
551: protected final void increaseWrittenBytesAndMessages(
552: WriteRequest request, long currentTime) {
553:
554: Object message = request.getMessage();
555: if (message instanceof IoBuffer) {
556: IoBuffer b = (IoBuffer) message;
557: if (b.hasRemaining()) {
558: increaseWrittenBytes(((IoBuffer) message).remaining(),
559: currentTime);
560: } else {
561: increaseWrittenMessages(currentTime);
562: }
563: } else if (message instanceof FileRegion) {
564: FileRegion region = (FileRegion) message;
565: if (region.getCount() == 0) {
566: increaseWrittenBytes(region.getWrittenBytes(),
567: currentTime);
568: increaseWrittenMessages(currentTime);
569: }
570: } else {
571: increaseWrittenMessages(currentTime);
572: }
573: }
574:
575: private void increaseWrittenBytes(long increment, long currentTime) {
576: if (increment <= 0) {
577: return;
578: }
579:
580: writtenBytes += increment;
581: lastWriteTime = currentTime;
582: idleCountForBoth = 0;
583: idleCountForWrite = 0;
584:
585: if (getService() instanceof AbstractIoService) {
586: ((AbstractIoService) getService()).increaseWrittenBytes(
587: increment, currentTime);
588: }
589:
590: increaseScheduledWriteBytes(-increment);
591: }
592:
593: private void increaseWrittenMessages(long currentTime) {
594: writtenMessages++;
595: lastWriteTime = currentTime;
596: if (getService() instanceof AbstractIoService) {
597: ((AbstractIoService) getService())
598: .increaseWrittenMessages(currentTime);
599: }
600:
601: decreaseScheduledWriteMessages();
602: }
603:
604: protected final void increaseScheduledWriteBytes(long increment) {
605: scheduledWriteBytes.addAndGet(increment);
606: if (getService() instanceof AbstractIoService) {
607: ((AbstractIoService) getService())
608: .increaseScheduledWriteBytes(increment);
609: }
610: }
611:
612: protected final void increaseScheduledWriteMessages() {
613: scheduledWriteMessages.incrementAndGet();
614: if (getService() instanceof AbstractIoService) {
615: ((AbstractIoService) getService())
616: .increaseScheduledWriteMessages();
617: }
618: }
619:
620: private void decreaseScheduledWriteMessages() {
621: scheduledWriteMessages.decrementAndGet();
622: if (getService() instanceof AbstractIoService) {
623: ((AbstractIoService) getService())
624: .decreaseScheduledWriteMessages();
625: }
626: }
627:
628: protected final void decreaseScheduledBytesAndMessages(
629: WriteRequest request) {
630: Object message = request.getMessage();
631: if (message instanceof IoBuffer) {
632: IoBuffer b = (IoBuffer) message;
633: if (b.hasRemaining()) {
634: increaseScheduledWriteBytes(-((IoBuffer) message)
635: .remaining());
636: } else {
637: decreaseScheduledWriteMessages();
638: }
639: } else {
640: decreaseScheduledWriteMessages();
641: }
642: }
643:
644: protected final WriteRequestQueue getWriteRequestQueue() {
645: if (writeRequestQueue == null) {
646: throw new IllegalStateException();
647: }
648: return writeRequestQueue;
649: }
650:
651: protected final WriteRequest getCurrentWriteRequest() {
652: return currentWriteRequest;
653: }
654:
655: protected final void setCurrentWriteRequest(
656: WriteRequest currentWriteRequest) {
657: this .currentWriteRequest = currentWriteRequest;
658: }
659:
660: protected final void increaseReadBufferSize() {
661: int newReadBufferSize = getConfig().getReadBufferSize() << 1;
662: if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
663: getConfig().setReadBufferSize(newReadBufferSize);
664: } else {
665: getConfig().setReadBufferSize(
666: getConfig().getMaxReadBufferSize());
667: }
668:
669: deferDecreaseReadBuffer = true;
670: }
671:
672: protected final void decreaseReadBufferSize() {
673: if (deferDecreaseReadBuffer) {
674: deferDecreaseReadBuffer = false;
675: return;
676: }
677:
678: if (getConfig().getReadBufferSize() > getConfig()
679: .getMinReadBufferSize()) {
680: getConfig().setReadBufferSize(
681: getConfig().getReadBufferSize() >>> 1);
682: }
683:
684: deferDecreaseReadBuffer = true;
685: }
686:
687: public final long getCreationTime() {
688: return creationTime;
689: }
690:
691: public final long getLastIoTime() {
692: return Math.max(lastReadTime, lastWriteTime);
693: }
694:
695: public final long getLastReadTime() {
696: return lastReadTime;
697: }
698:
699: public final long getLastWriteTime() {
700: return lastWriteTime;
701: }
702:
703: public final boolean isIdle(IdleStatus status) {
704: if (status == IdleStatus.BOTH_IDLE) {
705: return idleCountForBoth > 0;
706: }
707:
708: if (status == IdleStatus.READER_IDLE) {
709: return idleCountForRead > 0;
710: }
711:
712: if (status == IdleStatus.WRITER_IDLE) {
713: return idleCountForWrite > 0;
714: }
715:
716: throw new IllegalArgumentException("Unknown idle status: "
717: + status);
718: }
719:
720: public final boolean isBothIdle() {
721: return isIdle(IdleStatus.BOTH_IDLE);
722: }
723:
724: public final boolean isReaderIdle() {
725: return isIdle(IdleStatus.READER_IDLE);
726: }
727:
728: public final boolean isWriterIdle() {
729: return isIdle(IdleStatus.WRITER_IDLE);
730: }
731:
732: public final int getIdleCount(IdleStatus status) {
733: if (getConfig().getIdleTime(status) == 0) {
734: if (status == IdleStatus.BOTH_IDLE) {
735: idleCountForBoth = 0;
736: }
737:
738: if (status == IdleStatus.READER_IDLE) {
739: idleCountForRead = 0;
740: }
741:
742: if (status == IdleStatus.WRITER_IDLE) {
743: idleCountForWrite = 0;
744: }
745: }
746:
747: if (status == IdleStatus.BOTH_IDLE) {
748: return idleCountForBoth;
749: }
750:
751: if (status == IdleStatus.READER_IDLE) {
752: return idleCountForRead;
753: }
754:
755: if (status == IdleStatus.WRITER_IDLE) {
756: return idleCountForWrite;
757: }
758:
759: throw new IllegalArgumentException("Unknown idle status: "
760: + status);
761: }
762:
763: public final long getLastIdleTime(IdleStatus status) {
764: if (status == IdleStatus.BOTH_IDLE) {
765: return lastIdleTimeForBoth;
766: }
767:
768: if (status == IdleStatus.READER_IDLE) {
769: return lastIdleTimeForRead;
770: }
771:
772: if (status == IdleStatus.WRITER_IDLE) {
773: return lastIdleTimeForWrite;
774: }
775:
776: throw new IllegalArgumentException("Unknown idle status: "
777: + status);
778: }
779:
780: protected final void increaseIdleCount(IdleStatus status,
781: long currentTime) {
782: if (status == IdleStatus.BOTH_IDLE) {
783: idleCountForBoth++;
784: lastIdleTimeForBoth = currentTime;
785: } else if (status == IdleStatus.READER_IDLE) {
786: idleCountForRead++;
787: lastIdleTimeForRead = currentTime;
788: } else if (status == IdleStatus.WRITER_IDLE) {
789: idleCountForWrite++;
790: lastIdleTimeForWrite = currentTime;
791: } else {
792: throw new IllegalArgumentException("Unknown idle status: "
793: + status);
794: }
795: }
796:
797: public final int getBothIdleCount() {
798: return getIdleCount(IdleStatus.BOTH_IDLE);
799: }
800:
801: public final long getLastBothIdleTime() {
802: return getLastIdleTime(IdleStatus.BOTH_IDLE);
803: }
804:
805: public final long getLastReaderIdleTime() {
806: return getLastIdleTime(IdleStatus.READER_IDLE);
807: }
808:
809: public final long getLastWriterIdleTime() {
810: return getLastIdleTime(IdleStatus.WRITER_IDLE);
811: }
812:
813: public final int getReaderIdleCount() {
814: return getIdleCount(IdleStatus.READER_IDLE);
815: }
816:
817: public final int getWriterIdleCount() {
818: return getIdleCount(IdleStatus.WRITER_IDLE);
819: }
820:
821: public SocketAddress getServiceAddress() {
822: IoService service = getService();
823: if (service instanceof IoAcceptor) {
824: return ((IoAcceptor) service).getLocalAddress();
825: } else {
826: return getRemoteAddress();
827: }
828: }
829:
830: @Override
831: public final int hashCode() {
832: return super .hashCode();
833: }
834:
835: @Override
836: public final boolean equals(Object o) {
837: return super .equals(o);
838: }
839:
840: @Override
841: public String toString() {
842: if (getService() instanceof IoAcceptor) {
843: return "(" + getIdAsString() + ": " + getServiceName()
844: + ", server, " + getRemoteAddress() + " => "
845: + getLocalAddress() + ')';
846: } else {
847: return "(" + getIdAsString() + ": " + getServiceName()
848: + ", client, " + getLocalAddress() + " => "
849: + getRemoteAddress() + ')';
850: }
851: }
852:
853: private String getIdAsString() {
854: String id = Long.toHexString(getId()).toUpperCase();
855:
856: // Somewhat inefficient, but it won't happen that often
857: // because an ID is often a big integer.
858: while (id.length() < 8) {
859: id = '0' + id; // padding
860: }
861: id = "0x" + id;
862:
863: return id;
864: }
865:
866: private String getServiceName() {
867: TransportMetadata tm = getTransportMetadata();
868: if (tm == null) {
869: return "null";
870: } else {
871: return tm.getProviderName() + ' ' + tm.getName();
872: }
873: }
874:
875: private class CloseRequestAwareWriteRequestQueue implements
876: WriteRequestQueue {
877:
878: private final WriteRequestQueue q;
879:
880: public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
881: this .q = q;
882: }
883:
884: public synchronized WriteRequest poll(IoSession session) {
885: WriteRequest answer = q.poll(session);
886: if (answer == CLOSE_REQUEST) {
887: AbstractIoSession.this .close();
888: dispose(session);
889: answer = null;
890: }
891: return answer;
892: }
893:
894: public void offer(IoSession session, WriteRequest e) {
895: q.offer(session, e);
896: }
897:
898: public boolean isEmpty(IoSession session) {
899: return q.isEmpty(session);
900: }
901:
902: public void clear(IoSession session) {
903: q.clear(session);
904: }
905:
906: public void dispose(IoSession session) {
907: q.dispose(session);
908: }
909: }
910: }
|