001: package org.apache.mina.management;
002:
003: import org.apache.mina.common.*;
004: import org.apache.mina.filter.executor.ExecutorFilter;
005:
006: import java.net.SocketAddress;
007: import java.util.Queue;
008: import java.util.Set;
009: import java.util.concurrent.ConcurrentLinkedQueue;
010: import java.util.concurrent.atomic.AtomicLong;
011:
012: /**
013: * Collects statistics of an {@link org.apache.mina.common.IoService}. It's polling all the sessions of a given
014: * IoService. It's attaching a {@link org.apache.mina.management.IoSessionStat} object to all the sessions polled
015: * and filling the throughput values.
016: *
017: * Usage :
018: * <pre>
019: * IoService service = ...
020: * MINAStatCollector collector = new MINAStatCollector( service );
021: * collector.start();
022: * </pre>
023: *
024: * By default the {@link org.apache.mina.management.MINAStatCollector} is polling the sessions every 5 seconds. You can
025: * give a different polling time using a second constructor.<p>
026: *
027: * Note: This class is a spin-off from StatCollector present in
028: * https://svn.apache.org/repos/asf/mina/branches/1.1/core/src/main/java/org/apache/mina/management.
029: *
030: * @author The Apache Directory Project (mina-dev@directory.apache.org)
031: * @version $Rev: 477648 $, $Date: 2006-11-21 04:33:38 -0800 (Tue, 21 Nov 2006) $
032: */
033: public class MINAStatCollector {
034: /**
035: * The session attribute key for {@link org.apache.mina.management.IoSessionStat}.
036: */
037: public static final String KEY = MINAStatCollector.class.getName()
038: + ".stat";
039:
040: /**
041: * @noinspection StaticNonFinalField
042: */
043: private static volatile int nextId = 0;
044: private final int id = nextId++;
045:
046: private final IoService service;
047: private Worker worker;
048: private int pollingInterval = 5000;
049: private Queue<IoSession> polledSessions;
050:
051: // resume of session stats, for simplifying acces to the statistics
052: private AtomicLong totalProcessedSessions = new AtomicLong();
053: private AtomicLong totalMsgWritten = new AtomicLong();
054: private AtomicLong totalMsgRead = new AtomicLong();
055: private AtomicLong totalBytesWritten = new AtomicLong();
056: private AtomicLong totalBytesRead = new AtomicLong();
057: private AtomicLong totalScheduledWrites = new AtomicLong();
058: private AtomicLong totalQueuedEvents = new AtomicLong();
059:
060: private final IoServiceListener serviceListener = new IoServiceListener() {
061: public void serviceActivated(IoService service,
062: SocketAddress serviceAddress, IoHandler handler,
063: IoServiceConfig config) {
064: }
065:
066: public void serviceDeactivated(IoService service,
067: SocketAddress serviceAddress, IoHandler handler,
068: IoServiceConfig config) {
069: }
070:
071: public void sessionCreated(IoSession session) {
072: addSession(session);
073: }
074:
075: public void sessionDestroyed(IoSession session) {
076: removeSession(session);
077: }
078: };
079:
080: /**
081: * Create a stat collector for the given service with a default polling time of 5 seconds.
082: * @param service the IoService to inspect
083: */
084: public MINAStatCollector(IoService service) {
085: this (service, 5000);
086: }
087:
088: /**
089: * create a stat collector for the given given service
090: * @param service the IoService to inspect
091: * @param pollingInterval milliseconds
092: */
093: public MINAStatCollector(IoService service, int pollingInterval) {
094: this .service = service;
095: this .pollingInterval = pollingInterval;
096: }
097:
098: /**
099: * Start collecting stats for the {@link org.apache.mina.common.IoSession} of the service.
100: * New sessions or destroyed will be automaticly added or removed.
101: */
102: public void start() {
103: synchronized (this ) {
104: if (worker != null && worker.isAlive())
105: throw new RuntimeException(
106: "Stat collecting already started");
107:
108: // add all current sessions
109:
110: polledSessions = new ConcurrentLinkedQueue<IoSession>();
111:
112: Set<SocketAddress> addresses = service
113: .getManagedServiceAddresses();
114: if (addresses != null) {
115: for (SocketAddress element : addresses) {
116: for (IoSession ioSession : service
117: .getManagedSessions(element)) {
118: addSession(ioSession);
119: }
120: }
121: }
122:
123: // listen for new ones
124: service.addListener(serviceListener);
125:
126: // start polling
127: worker = new Worker();
128: worker.start();
129:
130: }
131:
132: }
133:
134: /**
135: * Stop collecting stats. all the {@link org.apache.mina.management.IoSessionStat} object will be removed of the
136: * polled session attachements.
137: */
138: public void stop() {
139: synchronized (this ) {
140: service.removeListener(serviceListener);
141:
142: // stop worker
143: worker.stop = true;
144: worker.interrupt();
145: while (worker.isAlive()) {
146: try {
147: worker.join();
148: } catch (InterruptedException e) {
149: //ignore since this is shutdown time
150: }
151: }
152:
153: for (IoSession session : polledSessions) {
154: session.removeAttribute(KEY);
155: }
156: polledSessions.clear();
157: }
158: }
159:
160: /**
161: * is the stat collector started and polling the {@link org.apache.mina.common.IoSession} of the {@link org.apache.mina.common.IoService}
162: * @return true if started
163: */
164: public boolean isRunning() {
165: synchronized (this ) {
166: return worker != null && worker.stop != true;
167: }
168: }
169:
170: private void addSession(IoSession session) {
171: IoSessionStat sessionStats = new IoSessionStat();
172: sessionStats.lastPollingTime = System.currentTimeMillis();
173: session.setAttribute(KEY, sessionStats);
174: totalProcessedSessions.incrementAndGet();
175: polledSessions.add(session);
176: }
177:
178: private void removeSession(IoSession session) {
179: // remove the session from the list of polled sessions
180: polledSessions.remove(session);
181:
182: // add the bytes processed between last polling and session closing
183: // prevent non seen byte with non-connected protocols like HTTP and datagrams
184: IoSessionStat sessStat = (IoSessionStat) session
185: .getAttribute(KEY);
186: session.removeAttribute(KEY);
187:
188: totalMsgWritten.addAndGet(session.getWrittenMessages()
189: - sessStat.lastMessageWrite);
190: totalMsgRead.addAndGet(session.getReadMessages()
191: - sessStat.lastMessageRead);
192: totalBytesWritten.addAndGet(session.getWrittenBytes()
193: - sessStat.lastByteWrite);
194: totalBytesRead.addAndGet(session.getReadBytes()
195: - sessStat.lastByteRead);
196: }
197:
198: /**
199: * total number of sessions processed by the stat collector
200: * @return number of sessions
201: */
202: public long getTotalProcessedSessions() {
203: return totalProcessedSessions.longValue();
204: }
205:
206: public long getBytesRead() {
207: return totalBytesRead.get();
208: }
209:
210: public long getBytesWritten() {
211: return totalBytesWritten.get();
212: }
213:
214: public long getMsgRead() {
215: return totalMsgRead.get();
216: }
217:
218: public long getMsgWritten() {
219: return totalMsgWritten.get();
220: }
221:
222: public long getScheduledWrites() {
223: return totalScheduledWrites.get();
224: }
225:
226: public long getQueuedEvents() {
227: return totalQueuedEvents.get();
228: }
229:
230: public long getSessionCount() {
231: return polledSessions.size();
232: }
233:
234: private class Worker extends Thread {
235:
236: boolean stop = false;
237:
238: private Worker() {
239: super ("StatCollectorWorker-" + id);
240: }
241:
242: public void run() {
243: while (!stop) {
244: // wait polling time
245: try {
246: Thread.sleep(pollingInterval);
247: } catch (InterruptedException e) {
248: }
249:
250: long tmpMsgWritten = 0l;
251: long tmpMsgRead = 0l;
252: long tmpBytesWritten = 0l;
253: long tmpBytesRead = 0l;
254: long tmpScheduledWrites = 0l;
255: long tmpQueuevedEvents = 0l;
256:
257: for (IoSession session : polledSessions) {
258: // upadating individual session statistics
259: IoSessionStat sessStat = (IoSessionStat) session
260: .getAttribute(KEY);
261:
262: long currentTimestamp = System.currentTimeMillis();
263: // Calculate delta
264: float pollDelta = (currentTimestamp - sessStat.lastPollingTime) / 1000f;
265: // Store last polling time of this session
266: sessStat.lastPollingTime = currentTimestamp;
267:
268: long readBytes = session.getReadBytes();
269: long writtenBytes = session.getWrittenBytes();
270: long readMessages = session.getReadMessages();
271: long writtenMessages = session.getWrittenMessages();
272: sessStat.byteReadThroughput = (readBytes - sessStat.lastByteRead)
273: / pollDelta;
274: sessStat.byteWrittenThroughput = (writtenBytes - sessStat.lastByteWrite)
275: / pollDelta;
276: sessStat.messageReadThroughput = (readMessages - sessStat.lastMessageRead)
277: / pollDelta;
278: sessStat.messageWrittenThroughput = (writtenMessages - sessStat.lastMessageWrite)
279: / pollDelta;
280:
281: tmpMsgWritten += (writtenMessages - sessStat.lastMessageWrite);
282: tmpMsgRead += (readMessages - sessStat.lastMessageRead);
283: tmpBytesWritten += (writtenBytes - sessStat.lastByteWrite);
284: tmpBytesRead += (readBytes - sessStat.lastByteRead);
285: tmpScheduledWrites += session
286: .getScheduledWriteRequests();
287:
288: ExecutorFilter executorFilter = (ExecutorFilter) session
289: .getFilterChain()
290: .get(ExecutorThreadModel.class.getName());
291: if (executorFilter != null) {
292: tmpQueuevedEvents += executorFilter
293: .getEventQueueSize(session);
294: }
295:
296: sessStat.lastByteRead = readBytes;
297: sessStat.lastByteWrite = writtenBytes;
298: sessStat.lastMessageRead = readMessages;
299: sessStat.lastMessageWrite = writtenMessages;
300:
301: }
302:
303: totalMsgWritten.addAndGet(tmpMsgWritten);
304: totalMsgRead.addAndGet(tmpMsgRead);
305: totalBytesWritten.addAndGet(tmpBytesWritten);
306: totalBytesRead.addAndGet(tmpBytesRead);
307: totalScheduledWrites.set(tmpScheduledWrites);
308: totalQueuedEvents.set(tmpQueuevedEvents);
309: }
310: }
311: }
312: }
|