001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection.spi;
022:
023: import java.io.IOException;
024: import java.nio.channels.ClosedSelectorException;
025: import java.util.Set;
026: import java.util.TimerTask;
027: import java.util.logging.Level;
028: import java.util.logging.Logger;
029:
030: import org.xsocket.DataConverter;
031: import org.xsocket.Dispatcher;
032: import org.xsocket.IDispatcherEventHandler;
033:
034: final class IoSocketDispatcher extends Dispatcher<IoSocketHandler> {
035:
036: private static final Logger LOG = Logger
037: .getLogger(IoSocketDispatcher.class.getName());
038:
039: static final String DISPATCHER_PREFIX = "xDispatcher";
040:
041: // watch dog
042: private static final long DEFAULT_WATCHDOG_PERIOD_MILLIS = 5L * 60L * 1000L;
043: private long watchDogPeriod = DEFAULT_WATCHDOG_PERIOD_MILLIS;
044: private TimerTask watchDogTask = null;
045:
046: private IMemoryManager memoryManager = null;
047:
048: // statistics
049: private long countIdleTimeouts = 0;
050: private long countConnectionTimeouts = 0;
051:
052: IoSocketDispatcher(IMemoryManager memoryManager) {
053: super (new DispatcherEventHandler(memoryManager));
054:
055: this .memoryManager = memoryManager;
056: updateTimeoutCheckPeriod(DEFAULT_WATCHDOG_PERIOD_MILLIS);
057: }
058:
059: long getReceiveRateBytesPerSec() {
060: return ((DispatcherEventHandler) getEventHandler())
061: .getReceiveRateBytesPerSec();
062: }
063:
064: long getSendRateBytesPerSec() {
065: return ((DispatcherEventHandler) getEventHandler())
066: .getSendRateBytesPerSec();
067: }
068:
069: public void resetStatistics() {
070: super .resetStatistics();
071:
072: countIdleTimeouts = 0;
073: countConnectionTimeouts = 0;
074: }
075:
076: @Override
077: public void register(IoSocketHandler handle, int ops)
078: throws IOException {
079: handle.setMemoryManager(memoryManager);
080: super .register(handle, ops);
081: }
082:
083: /**
084: * {@inheritDoc}
085: */
086: public long getNumberOfHandledRegistrations() {
087: return super .getNumberOfHandledRegistrations();
088: }
089:
090: /**
091: * {@inheritDoc}
092: */
093: public long getNumberOfHandledReads() {
094: return super .getNumberOfHandledReads();
095: }
096:
097: /**
098: * {@inheritDoc}
099: */
100: public long getNumberOfHandledWrites() {
101: return super .getNumberOfHandledWrites();
102: }
103:
104: public int getPreallocatedReadMemorySize() {
105: return memoryManager.getCurrentSizePreallocatedBuffer();
106: }
107:
108: boolean getReceiveBufferPreallocationMode() {
109: return memoryManager.isPreallocationMode();
110: }
111:
112: void setReceiveBufferPreallocationMode(boolean mode) {
113: memoryManager.setPreallocationMode(mode);
114: }
115:
116: void setReceiveBufferPreallocatedMinSize(Integer minSize) {
117: memoryManager.setPreallocatedMinBufferSize(minSize);
118: }
119:
120: Integer getReceiveBufferPreallocatedMinSize() {
121: if (memoryManager.isPreallocationMode()) {
122: return memoryManager.getPreallocatedMinBufferSize();
123: } else {
124: return null;
125: }
126: }
127:
128: Integer getReceiveBufferPreallocatedSize() {
129: if (memoryManager.isPreallocationMode()) {
130: return memoryManager.gettPreallocationBufferSize();
131: } else {
132: return null;
133: }
134: }
135:
136: void setReceiveBufferPreallocatedSize(Integer size) {
137: memoryManager.setPreallocationBufferSize(size);
138: }
139:
140: boolean getReceiveBufferIsDirect() {
141: return memoryManager.isDirect();
142: }
143:
144: void setReceiveBufferIsDirect(boolean isDirect) {
145: memoryManager.setDirect(isDirect);
146: }
147:
148: public void updateTimeoutCheckPeriod(long requiredMinPeriod) {
149:
150: // if not watch dog already exists and required period is smaller than current one return
151: if ((watchDogTask != null)
152: && (watchDogPeriod <= requiredMinPeriod)) {
153: return;
154: }
155:
156: // set watch dog period
157: watchDogPeriod = requiredMinPeriod;
158: if (LOG.isLoggable(Level.FINE)) {
159: LOG.fine("update dispatcher`s watchdog task "
160: + DataConverter.toFormatedDuration(watchDogPeriod));
161: }
162:
163: // if watchdog task task already exits -> terminate it
164: if (watchDogTask != null) {
165: watchDogTask.cancel();
166: }
167:
168: // create and run new watchdog task
169: watchDogTask = new TimerTask() {
170: @Override
171: public void run() {
172: checkTimeouts();
173: }
174: };
175: DefaultIoProvider.getTimer().schedule(watchDogTask,
176: watchDogPeriod, watchDogPeriod);
177: }
178:
179: @Override
180: public void close() {
181: super .close();
182:
183: if (watchDogTask != null) {
184: watchDogTask.cancel();
185: }
186: }
187:
188: long getCountIdleTimeout() {
189: return countIdleTimeouts;
190: }
191:
192: long getCountConnectionTimeout() {
193: return countConnectionTimeouts;
194: }
195:
196: void checkTimeouts() {
197: try {
198: long current = System.currentTimeMillis();
199: Set<IoSocketHandler> socketHandlers = getRegistered();
200: for (IoSocketHandler socketHandler : socketHandlers) {
201: checkTimeout(socketHandler, current);
202: }
203:
204: } catch (ClosedSelectorException cse) {
205: watchDogTask.cancel();
206:
207: } catch (Exception e) {
208: if (LOG.isLoggable(Level.FINE)) {
209: LOG.fine("error occured: " + e.toString());
210: }
211: }
212: }
213:
214: private void checkTimeout(IoSocketHandler ioSocketHandler,
215: long current) {
216: ioSocketHandler.checkConnection();
217:
218: boolean timeoutOccured = ioSocketHandler
219: .checkIdleTimeout(current);
220: if (timeoutOccured) {
221: countIdleTimeouts++;
222: }
223:
224: timeoutOccured = ioSocketHandler
225: .checkConnectionTimeout(current);
226: if (timeoutOccured) {
227: countConnectionTimeouts++;
228: }
229: }
230:
231: @Override
232: public String toString() {
233: return "open channels " + super .getRegistered().size();
234: }
235:
236: /**
237: * returns if current thread is disptacher thread
238: * @return true, if current thread is a dispatcher thread
239: */
240: static boolean isDispatcherThread() {
241: return Thread.currentThread().getName().startsWith(
242: DISPATCHER_PREFIX);
243: }
244:
245: private static final class DispatcherEventHandler implements
246: IDispatcherEventHandler<IoSocketHandler> {
247:
248: private IMemoryManager memoryManager = null;
249:
250: // statistics
251: private long receivedBytes = 0;
252: private long sentBytes = 0;
253: private long lastRequestReceiveRate = System
254: .currentTimeMillis();
255: private long lastRequestSendRate = System.currentTimeMillis();
256:
257: DispatcherEventHandler(IMemoryManager memoryManager) {
258: this .memoryManager = memoryManager;
259: }
260:
261: IMemoryManager getMemoryManager() {
262: return memoryManager;
263: }
264:
265: long getReceiveRateBytesPerSec() {
266: long rate = 0;
267:
268: long elapsed = System.currentTimeMillis()
269: - lastRequestReceiveRate;
270:
271: if (receivedBytes == 0) {
272: rate = 0;
273:
274: } else if (elapsed == 0) {
275: rate = Long.MAX_VALUE;
276:
277: } else {
278: rate = ((receivedBytes * 1000) / elapsed);
279: }
280:
281: lastRequestReceiveRate = System.currentTimeMillis();
282: receivedBytes = 0;
283:
284: return rate;
285: }
286:
287: long getSendRateBytesPerSec() {
288: long rate = 0;
289:
290: long elapsed = System.currentTimeMillis()
291: - lastRequestSendRate;
292:
293: if (sentBytes == 0) {
294: rate = 0;
295:
296: } else if (elapsed == 0) {
297: rate = Long.MAX_VALUE;
298:
299: } else {
300: rate = ((sentBytes * 1000) / elapsed);
301: }
302:
303: lastRequestSendRate = System.currentTimeMillis();
304: sentBytes = 0;
305:
306: return rate;
307: }
308:
309: public void onHandleReadableEvent(
310: final IoSocketHandler socketIOHandler)
311: throws IOException {
312: long read = socketIOHandler.onReadableEvent();
313: receivedBytes += read;
314: }
315:
316: @SuppressWarnings("unchecked")
317: public void onHandleWriteableEvent(
318: final IoSocketHandler socketIOHandler)
319: throws IOException {
320: long written = socketIOHandler.onWriteableEvent();
321: sentBytes += written;
322: }
323:
324: public void onHandleRegisterEvent(
325: final IoSocketHandler socketIOHandler)
326: throws IOException {
327: socketIOHandler.onConnectEvent();
328: }
329:
330: public void onDispatcherCloseEvent(
331: final IoSocketHandler socketIOHandler) {
332: socketIOHandler.onDispatcherClose();
333: }
334: }
335: }
|