001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.util.Iterator;
023: import java.util.Set;
024: import java.util.concurrent.Executors;
025: import java.util.concurrent.ScheduledExecutorService;
026: import java.util.concurrent.TimeUnit;
027:
028: import org.apache.mina.util.ConcurrentHashSet;
029: import org.apache.mina.util.NamePreservingRunnable;
030:
031: /**
032: * Detects idle sessions and fires <tt>sessionIdle</tt> events to them.
033: *
034: * @author The Apache MINA Project (dev@mina.apache.org)
035: * @version $Rev: 525369 $, $Date: 2007-04-04 05:05:11 +0200 (mer., 04 avr. 2007) $
036: */
037: public class IdleStatusChecker {
038: private static final IdleStatusChecker INSTANCE = new IdleStatusChecker();
039:
040: public static IdleStatusChecker getInstance() {
041: return INSTANCE;
042: }
043:
044: private final Set<AbstractIoSession> sessions = new ConcurrentHashSet<AbstractIoSession>();
045: private final Set<AbstractIoService> services = new ConcurrentHashSet<AbstractIoService>();
046:
047: private final Object lock = new Object();
048: private final Runnable notifyingTask = new NamePreservingRunnable(
049: new NotifyingTask(), "IdleStatusChecker");
050: private final IoFutureListener<IoFuture> sessionCloseListener = new SessionCloseListener();
051: private volatile ScheduledExecutorService executor;
052:
053: private IdleStatusChecker() {
054: }
055:
056: public void addSession(AbstractIoSession session) {
057: synchronized (lock) {
058: boolean start = false;
059: if (sessions.isEmpty() && services.isEmpty()) {
060: start = true;
061: }
062: if (!sessions.add(session)) {
063: return;
064: }
065: if (start) {
066: start();
067: }
068: }
069:
070: session.getCloseFuture().addListener(sessionCloseListener);
071: }
072:
073: public void addService(AbstractIoService service) {
074: synchronized (lock) {
075: boolean start = false;
076: if (sessions.isEmpty() && services.isEmpty()) {
077: start = true;
078: }
079: if (!services.add(service)) {
080: return;
081: }
082: if (start) {
083: start();
084: }
085: }
086: }
087:
088: public void removeSession(AbstractIoSession session) {
089: synchronized (lock) {
090: sessions.remove(session);
091: if (sessions.isEmpty() && services.isEmpty()) {
092: stop();
093: }
094: }
095: }
096:
097: public void removeService(AbstractIoService service) {
098: synchronized (lock) {
099: services.remove(service);
100: if (sessions.isEmpty() && services.isEmpty()) {
101: stop();
102: }
103: }
104: }
105:
106: private void start() {
107: ScheduledExecutorService executor = Executors
108: .newScheduledThreadPool(1);
109: this .executor = executor;
110: executor.scheduleWithFixedDelay(notifyingTask, 1000, 1000,
111: TimeUnit.MILLISECONDS);
112: }
113:
114: private void stop() {
115: ScheduledExecutorService executor = this .executor;
116: if (executor == null) {
117: return;
118: }
119: executor.shutdownNow();
120: this .executor = null;
121: }
122:
123: private class NotifyingTask implements Runnable {
124: public void run() {
125: long currentTime = System.currentTimeMillis();
126: notifyServices(currentTime);
127: notifySessions(currentTime);
128: }
129:
130: private void notifyServices(long currentTime) {
131: Iterator<AbstractIoService> it = services.iterator();
132: while (it.hasNext()) {
133: AbstractIoService service = it.next();
134: if (service.isActive()) {
135: notifyIdleness(service, currentTime, false);
136: }
137: }
138: }
139:
140: private void notifySessions(long currentTime) {
141: Iterator<AbstractIoSession> it = sessions.iterator();
142: while (it.hasNext()) {
143: AbstractIoSession session = it.next();
144: if (session.isConnected()) {
145: notifyIdleSession(session, currentTime);
146: }
147: }
148: }
149: }
150:
151: private class SessionCloseListener implements
152: IoFutureListener<IoFuture> {
153: public void operationComplete(IoFuture future) {
154: removeSession((AbstractIoSession) future.getSession());
155: }
156: }
157:
158: /**
159: * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable
160: * sessions in the specified collection.
161: *
162: * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
163: */
164: public static void notifyIdleness(
165: Iterator<? extends IoSession> sessions, long currentTime) {
166: IoSession s = null;
167: while (sessions.hasNext()) {
168: s = sessions.next();
169: notifyIdleSession(s, currentTime);
170: }
171: }
172:
173: public static void notifyIdleness(IoService service,
174: long currentTime) {
175: notifyIdleness(service, currentTime, true);
176: }
177:
178: private static void notifyIdleness(IoService service,
179: long currentTime, boolean includeSessions) {
180: if (!(service instanceof AbstractIoService)) {
181: return;
182: }
183:
184: ((AbstractIoService) service).notifyIdleness(currentTime);
185:
186: if (includeSessions) {
187: notifyIdleness(service.getManagedSessions().iterator(),
188: currentTime);
189: }
190: }
191:
192: /**
193: * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
194: * specified {@code session}.
195: *
196: * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
197: */
198: public static void notifyIdleSession(IoSession session,
199: long currentTime) {
200: if (session instanceof AbstractIoSession) {
201: AbstractIoSession s = (AbstractIoSession) session;
202: notifyIdleSession1(s, currentTime, s.getConfig()
203: .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
204: IdleStatus.BOTH_IDLE, Math.max(s.getLastIoTime(), s
205: .getLastIdleTime(IdleStatus.BOTH_IDLE)));
206:
207: notifyIdleSession1(s, currentTime, s.getConfig()
208: .getIdleTimeInMillis(IdleStatus.READER_IDLE),
209: IdleStatus.READER_IDLE, Math.max(s
210: .getLastReadTime(), s
211: .getLastIdleTime(IdleStatus.READER_IDLE)));
212:
213: notifyIdleSession1(s, currentTime, s.getConfig()
214: .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
215: IdleStatus.WRITER_IDLE, Math.max(s
216: .getLastWriteTime(), s
217: .getLastIdleTime(IdleStatus.WRITER_IDLE)));
218:
219: notifyWriteTimeout(s, currentTime);
220: updateThroughput(s, currentTime);
221: } else {
222: notifyIdleSession0(session, currentTime, session
223: .getConfig().getIdleTimeInMillis(
224: IdleStatus.BOTH_IDLE),
225: IdleStatus.BOTH_IDLE, Math.max(session
226: .getLastIoTime(), session
227: .getLastIdleTime(IdleStatus.BOTH_IDLE)));
228:
229: notifyIdleSession0(session, currentTime, session
230: .getConfig().getIdleTimeInMillis(
231: IdleStatus.READER_IDLE),
232: IdleStatus.READER_IDLE, Math.max(session
233: .getLastReadTime(), session
234: .getLastIdleTime(IdleStatus.READER_IDLE)));
235:
236: notifyIdleSession0(session, currentTime, session
237: .getConfig().getIdleTimeInMillis(
238: IdleStatus.WRITER_IDLE),
239: IdleStatus.WRITER_IDLE, Math.max(session
240: .getLastWriteTime(), session
241: .getLastIdleTime(IdleStatus.WRITER_IDLE)));
242: }
243: }
244:
245: private static void notifyIdleSession0(IoSession session,
246: long currentTime, long idleTime, IdleStatus status,
247: long lastIoTime) {
248: if (idleTime > 0 && lastIoTime != 0
249: && currentTime - lastIoTime >= idleTime) {
250: session.getFilterChain().fireSessionIdle(status);
251: }
252: }
253:
254: private static void notifyIdleSession1(AbstractIoSession session,
255: long currentTime, long idleTime, IdleStatus status,
256: long lastIoTime) {
257: if (idleTime > 0 && lastIoTime != 0
258: && currentTime - lastIoTime >= idleTime) {
259: session.getFilterChain().fireSessionIdle(status);
260: }
261: }
262:
263: private static void notifyWriteTimeout(AbstractIoSession session,
264: long currentTime) {
265:
266: long writeTimeout = session.getConfig()
267: .getWriteTimeoutInMillis();
268: if (writeTimeout > 0
269: && currentTime - session.getLastWriteTime() >= writeTimeout
270: && !session.getWriteRequestQueue().isEmpty(session)) {
271: WriteRequest request = session.getCurrentWriteRequest();
272: if (request != null) {
273: session.setCurrentWriteRequest(null);
274: WriteTimeoutException cause = new WriteTimeoutException(
275: request);
276: request.getFuture().setException(cause);
277: session.getFilterChain().fireExceptionCaught(cause);
278: // WriteException is an IOException, so we close the session.
279: session.close();
280: }
281: }
282: }
283:
284: private static void updateThroughput(AbstractIoSession session,
285: long currentTime) {
286: session.updateThroughput(currentTime, false);
287: }
288: }
|