001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket;
022:
023: import java.io.IOException;
024: import java.nio.channels.SelectionKey;
025: import java.nio.channels.Selector;
026: import java.util.HashSet;
027: import java.util.Iterator;
028: import java.util.Set;
029: import java.util.logging.Level;
030: import java.util.logging.Logger;
031:
032: /**
033: * implementation of the {@link IDispatcher} <br><br>
034: * All dispatcher methods are thread save.
035: *
036: * <br/><br/><b>This is a xSocket internal class and subject to change</b>
037: *
038: * @author grro@xsocket.org
039: */
040: public class Dispatcher<T extends IHandle> implements IDispatcher<T> {
041:
042: private static final Logger LOG = Logger.getLogger(Dispatcher.class
043: .getName());
044:
045: private static final int SELECTOR_TIMEOUT = 5 * 1000;
046: private static final long TIMEOUT_SHUTDOWN_MILLIS = 5L * 1000L;
047:
048: // is open flag
049: private volatile boolean isOpen = true;
050:
051: // guard object for synchronizing
052: private final Object dispatcherThreadGuard = new Object();
053:
054: // connection handling
055: private Selector selector = null;
056:
057: // event handler
058: private IDispatcherEventHandler<T> eventHandler = null;
059:
060: // statistics
061: private long statisticsStartTime = System.currentTimeMillis();
062: private long handledRegistractions = 0;
063: private long handledReads = 0;
064: private long handledWrites = 0;
065:
066: /**
067: * constructor
068: *
069: * @param eventHandler the assigned event handler
070: */
071: public Dispatcher(IDispatcherEventHandler<T> eventHandler) {
072: assert (eventHandler != null) : "null is not allowed for event handler ";
073:
074: this .eventHandler = eventHandler;
075:
076: if (LOG.isLoggable(Level.FINE)) {
077: LOG.fine("dispatcher " + this .hashCode()
078: + " has been created (eventHandler=" + eventHandler
079: + ")");
080: }
081:
082: try {
083: selector = Selector.open();
084: } catch (IOException ioe) {
085: String text = "exception occured while opening selector. Reason: "
086: + ioe.toString();
087: LOG.severe(text);
088: throw new RuntimeException(text, ioe);
089: }
090: }
091:
092: /**
093: * {@inheritDoc}
094: */
095: public final IDispatcherEventHandler<T> getEventHandler() {
096: return eventHandler;
097: }
098:
099: /**
100: * {@inheritDoc}
101: */
102: public void register(T handle, int ops) throws IOException {
103: assert (!handle.getChannel().isBlocking());
104:
105: if (LOG.isLoggable(Level.FINE)) {
106: LOG.fine("register handle " + handle);
107: }
108:
109: synchronized (dispatcherThreadGuard) {
110: selector.wakeup();
111:
112: handle.getChannel().register(selector, ops, handle);
113: eventHandler.onHandleRegisterEvent(handle);
114: }
115:
116: handledRegistractions++;
117: }
118:
119: /**
120: * {@inheritDoc}
121: */
122: public void deregister(final T handle) throws IOException {
123:
124: synchronized (dispatcherThreadGuard) {
125: selector.wakeup();
126:
127: SelectionKey key = handle.getChannel().keyFor(selector);
128: if (key != null) {
129: if (key.isValid()) {
130: key.cancel();
131: }
132: }
133: }
134: }
135:
136: /**
137: * {@inheritDoc}
138: */
139: @SuppressWarnings("unchecked")
140: public final Set<T> getRegistered() {
141:
142: Set<T> registered = new HashSet<T>();
143:
144: if (selector != null) {
145: SelectionKey[] selKeys = null;
146: synchronized (dispatcherThreadGuard) {
147: selector.wakeup();
148:
149: Set<SelectionKey> keySet = selector.keys();
150: selKeys = keySet
151: .toArray(new SelectionKey[keySet.size()]);
152: }
153:
154: try {
155: for (SelectionKey key : selKeys) {
156: T handle = (T) key.attachment();
157: registered.add(handle);
158: }
159: } catch (Exception ignore) {
160: }
161: }
162:
163: return registered;
164: }
165:
166: /**
167: * {@inheritDoc}
168: */
169: public final void updateInterestSet(T handle, int ops)
170: throws IOException {
171: SelectionKey key = handle.getChannel().keyFor(selector);
172:
173: if (key != null) {
174: synchronized (dispatcherThreadGuard) {
175: if (key.isValid()) {
176: key.selector().wakeup();
177:
178: // if (LOG.isLoggable(Level.FINER)) {
179: // LOG.finer("updating interest ops for " + handle + ". current value is " + printSelectionKeyValue(key.interestOps()));
180: // }
181:
182: key.interestOps(ops);
183:
184: // if (LOG.isLoggable(Level.FINE)) {
185: // LOG.fine("interest ops has been updated to " + printSelectionKeyValue(ops));
186: // }
187: } else {
188: throw new IOException("handle " + handle
189: + " is invalid ");
190: }
191: }
192: }
193: }
194:
195: /**
196: * {@inheritDoc}
197: */
198: @SuppressWarnings("unchecked")
199: public final void run() {
200:
201: if (LOG.isLoggable(Level.FINE)) {
202: LOG.fine("selector listening ...");
203: }
204:
205: while (isOpen) {
206: try {
207:
208: // see http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf
209: synchronized (dispatcherThreadGuard) {
210: /* suspend the dispatcher thread */
211: }
212:
213: int eventCount = selector.select(SELECTOR_TIMEOUT);
214:
215: // handle read write events
216: if (eventCount > 0) {
217: Set selectedEventKeys = selector.selectedKeys();
218: Iterator it = selectedEventKeys.iterator();
219:
220: // handle read & write
221: while (it.hasNext()) {
222:
223: SelectionKey eventKey = (SelectionKey) it
224: .next();
225: it.remove();
226:
227: T handle = (T) eventKey.attachment();
228:
229: // read data
230: if (eventKey.isValid() && eventKey.isReadable()) {
231:
232: // notify event handler
233: try {
234: eventHandler
235: .onHandleReadableEvent(handle);
236: } catch (Exception e) {
237: LOG
238: .warning("["
239: + Thread
240: .currentThread()
241: .getName()
242: + "] exception occured while handling readable event. Reason "
243: + e.toString());
244: }
245:
246: handledReads++;
247: }
248:
249: // write data
250: if (eventKey.isValid() && eventKey.isWritable()) {
251: handledWrites++;
252:
253: // notify event handler
254: try {
255: eventHandler
256: .onHandleWriteableEvent(handle);
257: } catch (Exception e) {
258: LOG
259: .warning("["
260: + Thread
261: .currentThread()
262: .getName()
263: + "] exception occured while handling writeable event. Reason "
264: + e.toString());
265: }
266: }
267: }
268: }
269:
270: } catch (Exception e) {
271: LOG
272: .warning("["
273: + Thread.currentThread().getName()
274: + "] exception occured while processing. Reason "
275: + e.toString());
276: }
277: }
278:
279: closeDispatcher();
280: }
281:
282: @SuppressWarnings("unchecked")
283: private void closeDispatcher() {
284: LOG.fine("closing connections");
285:
286: if (selector != null) {
287: try {
288: selector.close();
289: } catch (Exception e) {
290: if (LOG.isLoggable(Level.FINE)) {
291: LOG
292: .fine("error occured by close selector within tearDown "
293: + e.toString());
294: }
295: }
296: }
297: }
298:
299: /**
300: * {@inheritDoc}
301: */
302: public void close() {
303: if (isOpen) {
304: if (selector != null) {
305:
306: // initiate closing of open connections
307: Set<T> openHandles = getRegistered();
308: int openConnections = openHandles.size();
309:
310: for (T handle : openHandles) {
311: eventHandler.onDispatcherCloseEvent(handle);
312: }
313:
314: // start closer thread
315: new Thread(new Closer(openConnections)).start();
316: }
317: }
318: }
319:
320: /**
321: * check if this dispatcher is open
322: * @return true, if the disptacher is open
323: */
324: public final boolean isOpen() {
325: return isOpen;
326: }
327:
328: /**
329: * {@inheritDoc}
330: */
331: public long getNumberOfHandledRegistrations() {
332: return handledRegistractions;
333: }
334:
335: /**
336: * {@inheritDoc}
337: */
338: public long getNumberOfHandledReads() {
339: return handledReads;
340: }
341:
342: /**
343: * {@inheritDoc}
344: */
345: public long getNumberOfHandledWrites() {
346: return handledWrites;
347: }
348:
349: public void resetStatistics() {
350: statisticsStartTime = System.currentTimeMillis();
351:
352: handledRegistractions = 0;
353: handledReads = 0;
354: handledWrites = 0;
355: }
356:
357: protected long getStatisticsStartTime() {
358: return statisticsStartTime;
359: }
360:
361: /*
362: private String printSelectionKeyValue(int key) {
363:
364: StringBuilder sb = new StringBuilder();
365:
366: if ((key & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
367: sb.append("OP_ACCEPT, ");
368: }
369:
370: if ((key & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) {
371: sb.append("OP_CONNECT, ");
372: }
373:
374: if ((key & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
375: sb.append("OP_WRITE, ");
376: }
377:
378: if ((key & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
379: sb.append("OP_READ, ");
380: }
381:
382: String txt = sb.toString();
383: txt = txt.trim();
384:
385: if (txt.length() > 0) {
386: txt = txt.substring(0, txt.length() - 1);
387: }
388:
389: return txt + " (" + key + ")";
390: }
391: */
392:
393: private class Closer implements Runnable {
394:
395: private int openConnections = 0;
396:
397: public Closer(int openConnections) {
398: this .openConnections = openConnections;
399: }
400:
401: public void run() {
402: Thread.currentThread().setName("xDispatcherCloser");
403:
404: long start = System.currentTimeMillis();
405:
406: int terminatedConnections = 0;
407: do {
408: try {
409: Thread.sleep(100);
410: } catch (InterruptedException ignore) {
411: }
412:
413: if (System.currentTimeMillis() > (start + TIMEOUT_SHUTDOWN_MILLIS)) {
414: LOG
415: .warning("shutdown timeout reached ("
416: + DataConverter
417: .toFormatedDuration(TIMEOUT_SHUTDOWN_MILLIS)
418: + "). kill pending connections");
419: for (SelectionKey sk : selector.keys()) {
420: try {
421: terminatedConnections++;
422: sk.channel().close();
423: } catch (Exception ignore) {
424: }
425: }
426:
427: break;
428: }
429: } while (getRegistered().size() > 0);
430:
431: isOpen = false;
432: // wake up selector, so that isRunning-loop can be terminated
433: selector.wakeup();
434:
435: if ((openConnections > 0) || (terminatedConnections > 0)) {
436: if ((openConnections > 0)
437: && (terminatedConnections > 0)) {
438: LOG
439: .info((openConnections - terminatedConnections)
440: + " connections has been closed properly, "
441: + terminatedConnections
442: + " connections has been terminate unclean");
443: }
444: }
445:
446: if (LOG.isLoggable(Level.FINE)) {
447: LOG.fine("dispatcher "
448: + this .hashCode()
449: + " has been closed (shutdown time = "
450: + DataConverter.toFormatedDuration(System
451: .currentTimeMillis()
452: - start) + ")");
453: }
454: }
455: }
456: }
|