001: package org.mortbay.io.nio;
002:
003: import java.io.IOException;
004: import java.nio.channels.CancelledKeyException;
005: import java.nio.channels.ClosedChannelException;
006: import java.nio.channels.SelectionKey;
007: import java.nio.channels.Selector;
008: import java.nio.channels.ServerSocketChannel;
009: import java.nio.channels.SocketChannel;
010: import java.util.ArrayList;
011: import java.util.ConcurrentModificationException;
012: import java.util.Iterator;
013: import java.util.List;
014:
015: import org.mortbay.component.AbstractLifeCycle;
016: import org.mortbay.io.Connection;
017: import org.mortbay.io.EndPoint;
018: import org.mortbay.log.Log;
019: import org.mortbay.thread.Timeout;
020:
021: /* ------------------------------------------------------------ */
022: /**
023: * The Selector Manager manages and number of SelectSets to allow
024: * NIO scheduling to scale to large numbers of connections.
025: *
026: * @author gregw
027: *
028: */
029: public abstract class SelectorManager extends AbstractLifeCycle {
030: private boolean _delaySelectKeyUpdate = true;
031: private long _maxIdleTime;
032: private transient SelectSet[] _selectSet;
033: private int _selectSets = 1;
034:
035: /* ------------------------------------------------------------ */
036: /**
037: * @return
038: */
039: public long getMaxIdleTime() {
040: return _maxIdleTime;
041: }
042:
043: public int getSelectSets() {
044: return _selectSets;
045: }
046:
047: public boolean isDelaySelectKeyUpdate() {
048: return _delaySelectKeyUpdate;
049: }
050:
051: public SelectionKey register(ServerSocketChannel acceptChannel,
052: int op_accept) throws ClosedChannelException {
053: int set = 0; // TODO next set?
054:
055: synchronized (_selectSet[set]) {
056: SelectionKey key = acceptChannel.register(_selectSet[set]
057: .getSelector(), SelectionKey.OP_ACCEPT);
058: return key;
059: }
060: // TODO Auto-generated method stub
061:
062: }
063:
064: public void doSelect(int acceptorID) throws IOException {
065:
066: if (_selectSet != null && _selectSet.length > acceptorID
067: && _selectSet[acceptorID] != null)
068: _selectSet[acceptorID].doSelect();
069:
070: }
071:
072: public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate) {
073: _delaySelectKeyUpdate = delaySelectKeyUpdate;
074: }
075:
076: /* ------------------------------------------------------------ */
077: /**
078: * @param maxIdleTime
079: */
080: public void setMaxIdleTime(long maxIdleTime) {
081: _maxIdleTime = maxIdleTime;
082: }
083:
084: public void setSelectSets(int selectSets) {
085: _selectSets = selectSets;
086: }
087:
088: /* ------------------------------------------------------------ */
089: /**
090: * @param key
091: * @return
092: * @throws IOException
093: */
094: protected abstract SocketChannel acceptChannel(SelectionKey key)
095: throws IOException;
096:
097: /* ------------------------------------------------------------------------------- */
098: protected abstract boolean dispatch(Runnable task)
099: throws IOException;
100:
101: protected void doStart() throws Exception {
102: _selectSet = new SelectSet[_selectSets];
103: for (int i = 0; i < _selectSet.length; i++)
104: _selectSet[i] = new SelectSet(i);
105:
106: super .doStart();
107: }
108:
109: /* ------------------------------------------------------------------------------- */
110: protected void doStop() throws Exception {
111: for (int i = 0; i < _selectSet.length; i++)
112: _selectSet[i].stop();
113: super .doStop();
114: _selectSet = null;
115: }
116:
117: /* ------------------------------------------------------------------------------- */
118: public void doStop(int i) throws Exception {
119: _selectSet[i].stop();
120: }
121:
122: /* ------------------------------------------------------------------------------- */
123: private void doDispatch(SelectChannelEndPoint endpoint)
124: throws IOException {
125: boolean dispatch_done = true;
126: try {
127: if (endpoint.dispatch(_delaySelectKeyUpdate)) {
128: dispatch_done = false;
129: dispatch_done = dispatch((Runnable) endpoint);
130: }
131: } finally {
132: if (!dispatch_done) {
133: Log.warn("dispatch failed!");
134: endpoint.undispatch();
135: }
136: }
137: }
138:
139: /* ------------------------------------------------------------ */
140: /**
141: * @param endpoint
142: */
143: protected abstract void endPointClosed(
144: SelectChannelEndPoint endpoint);
145:
146: /* ------------------------------------------------------------ */
147: /**
148: * @param endpoint
149: */
150: protected abstract void endPointOpened(
151: SelectChannelEndPoint endpoint);
152:
153: /* ------------------------------------------------------------------------------- */
154: protected abstract Connection newConnection(SocketChannel channel,
155: SelectChannelEndPoint endpoint);
156:
157: /* ------------------------------------------------------------ */
158: /**
159: * @param channel
160: * @param selectSet
161: * @param sKey
162: * @return
163: * @throws IOException
164: */
165: protected abstract SelectChannelEndPoint newEndPoint(
166: SocketChannel channel, SelectorManager.SelectSet selectSet,
167: SelectionKey sKey) throws IOException;
168:
169: /* ------------------------------------------------------------------------------- */
170: /* ------------------------------------------------------------------------------- */
171: /* ------------------------------------------------------------------------------- */
172: public class SelectSet {
173: private transient int _change;
174: private transient List[] _changes;
175: private transient Timeout _idleTimeout;
176: private transient int _nextSet;
177: private transient Timeout _retryTimeout;
178: private transient Selector _selector;
179: private transient int _setID;
180: private transient boolean _selecting;
181:
182: /* ------------------------------------------------------------ */
183: SelectSet(int acceptorID) throws Exception {
184: _setID = acceptorID;
185:
186: _idleTimeout = new Timeout();
187: _idleTimeout.setDuration(getMaxIdleTime());
188: _retryTimeout = new Timeout();
189: _retryTimeout.setDuration(0L);
190:
191: // create a selector;
192: _selector = Selector.open();
193: _changes = new ArrayList[] { new ArrayList(),
194: new ArrayList() };
195: _change = 0;
196: }
197:
198: /* ------------------------------------------------------------ */
199: public void addChange(Object point) {
200: synchronized (_changes) {
201: _changes[_change].add(point);
202: }
203: }
204:
205: /* ------------------------------------------------------------ */
206: public void cancelIdle(Timeout.Task task) {
207: synchronized (this ) {
208: task.cancel();
209: }
210: }
211:
212: /* ------------------------------------------------------------ */
213: public void doSelect() throws IOException {
214: long idle_next = 0;
215: long retry_next = 0;
216:
217: try {
218: List changes;
219: synchronized (_changes) {
220: changes = _changes[_change];
221: _change = _change == 0 ? 1 : 0;
222: _selecting = true;
223: }
224:
225: // Make any key changes required
226: for (int i = 0; i < changes.size(); i++) {
227: try {
228: Object o = changes.get(i);
229: if (o instanceof EndPoint) {
230: // Update the operatios for a key.
231: SelectChannelEndPoint endpoint = (SelectChannelEndPoint) o;
232: endpoint.syncKey();
233: } else if (o instanceof SocketChannel) {
234: // finish accepting this connection
235: SocketChannel channel = (SocketChannel) o;
236: SelectionKey cKey = channel.register(
237: _selector, SelectionKey.OP_READ);
238: SelectChannelEndPoint endpoint = newEndPoint(
239: channel, this , cKey);
240:
241: if (_delaySelectKeyUpdate)
242: doDispatch(endpoint);
243: } else if (o instanceof Runnable) {
244: dispatch((Runnable) o);
245: } else
246: throw new IllegalArgumentException(o
247: .toString());
248: } catch (CancelledKeyException e) {
249: if (isRunning())
250: Log.warn(e);
251: else
252: Log.debug(e);
253: }
254: }
255: changes.clear();
256:
257: synchronized (this ) {
258: _idleTimeout.setDuration(getMaxIdleTime());
259: idle_next = _idleTimeout.getTimeToNext();
260: retry_next = _retryTimeout.getTimeToNext();
261: }
262:
263: // workout how low to wait in select
264: long wait = 1000L; // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
265: if (wait < 0 || idle_next >= 0 && wait > idle_next)
266: wait = idle_next;
267: if (wait < 0 || retry_next >= 0 && wait > retry_next)
268: wait = retry_next;
269:
270: // Do the select.
271: if (wait > 0)
272: _selector.select(wait);
273: else if (wait == 0)
274: _selector.selectNow();
275: else
276: _selector.select();
277:
278: long now = -1;
279:
280: // have we been destroyed while sleeping\
281: if (_selector == null || !_selector.isOpen())
282: return;
283:
284: // update the timers for task schedule in this loop
285: now = System.currentTimeMillis();
286: _idleTimeout.setNow(now);
287: _retryTimeout.setNow(now);
288:
289: // Look for things to do
290: Iterator iter = _selector.selectedKeys().iterator();
291: while (iter.hasNext()) {
292: SelectionKey key = (SelectionKey) iter.next();
293: iter.remove();
294:
295: try {
296: if (!key.isValid()) {
297: key.cancel();
298: SelectChannelEndPoint endpoint = (SelectChannelEndPoint) key
299: .attachment();
300: if (endpoint != null) {
301: endpoint.close();
302: endPointClosed(endpoint);
303: }
304: continue;
305: }
306:
307: if (key.isAcceptable()) {
308:
309: SocketChannel channel = acceptChannel(key);
310: if (channel == null)
311: continue;
312:
313: channel.configureBlocking(false);
314:
315: // TODO make it reluctant to leave 0
316: _nextSet = ++_nextSet % _selectSet.length;
317:
318: // Is this for this selectset
319: if (_nextSet != _setID) {
320: // nope - give it to another.
321: _selectSet[_nextSet].addChange(channel);
322: _selectSet[_nextSet].wakeup();
323: } else {
324: // bind connections to this select set.
325: SelectionKey cKey = channel.register(
326: _selectSet[_nextSet]
327: .getSelector(),
328: SelectionKey.OP_READ);
329: SelectChannelEndPoint endpoint = newEndPoint(
330: channel, _selectSet[_nextSet],
331: cKey);
332: if (endpoint != null)
333: doDispatch(endpoint);
334: }
335: } else {
336: SelectChannelEndPoint endpoint = (SelectChannelEndPoint) key
337: .attachment();
338: if (endpoint != null)
339: doDispatch(endpoint);
340: }
341:
342: key = null;
343: } catch (CancelledKeyException e) {
344: // TODO investigate if this actually is a problem?
345: if (isRunning())
346: Log.warn(e);
347: else
348: Log.ignore(e);
349: } catch (Exception e) {
350: if (isRunning())
351: Log.warn(e);
352: else
353: Log.ignore(e);
354:
355: if (key != null
356: && !(key.channel() instanceof ServerSocketChannel)
357: && key.isValid()) {
358: key.interestOps(0);
359: key.cancel();
360: }
361: }
362: }
363:
364: // tick over the timers
365: Timeout.Task task = null;
366: synchronized (this ) {
367: now = System.currentTimeMillis();
368: _retryTimeout.setNow(now);
369: _idleTimeout.setNow(now);
370:
371: task = _idleTimeout.expired();
372: if (task == null)
373: task = _retryTimeout.expired();
374: }
375:
376: // handle any expired timers
377: while (task != null) {
378: task.expire();
379:
380: // get the next timer tasks
381: synchronized (this ) {
382: if (_selector == null)
383: break;
384: task = _idleTimeout.expired();
385: if (task == null)
386: task = _retryTimeout.expired();
387: }
388: }
389: } finally {
390: synchronized (this ) {
391: _selecting = false;
392: }
393: }
394: }
395:
396: /* ------------------------------------------------------------ */
397: public SelectorManager getManager() {
398: return SelectorManager.this ;
399: }
400:
401: /* ------------------------------------------------------------ */
402: public long getNow() {
403: return _idleTimeout.getNow();
404: }
405:
406: /* ------------------------------------------------------------ */
407: public void scheduleIdle(Timeout.Task task) {
408: synchronized (this ) {
409: task.schedule(_idleTimeout);
410: }
411: }
412:
413: /* ------------------------------------------------------------ */
414: public void scheduleTimeout(Timeout.Task task, long timeout) {
415: synchronized (this ) {
416: _retryTimeout.schedule(task, timeout);
417: }
418: }
419:
420: /* ------------------------------------------------------------ */
421: public void wakeup() {
422: Selector selector = _selector;
423: if (selector != null)
424: selector.wakeup();
425: }
426:
427: /* ------------------------------------------------------------ */
428: Selector getSelector() {
429: return _selector;
430: }
431:
432: /* ------------------------------------------------------------ */
433: void stop() throws Exception {
434: boolean selecting = true;
435: while (selecting) {
436: wakeup();
437: Thread.yield();
438: synchronized (this ) {
439: selecting = _selecting;
440: }
441: }
442:
443: synchronized (this ) {
444: Iterator iter = _selector.keys().iterator();
445: while (iter.hasNext()) {
446: SelectionKey key = (SelectionKey) iter.next();
447: if (key == null)
448: continue;
449: EndPoint endpoint = (EndPoint) key.attachment();
450: if (endpoint != null) {
451: try {
452: endpoint.close();
453: } catch (IOException e) {
454: Log.ignore(e);
455: }
456: }
457: }
458:
459: _idleTimeout.cancelAll();
460: _retryTimeout.cancelAll();
461: try {
462: if (_selector != null)
463: _selector.close();
464: } catch (IOException e) {
465: Log.ignore(e);
466: }
467: _selector = null;
468: }
469: }
470: }
471: }
|