001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket.nio;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.lib.aSocket.SelectSourceIF;
029:
030: import java.nio.channels.*;
031: import java.net.*;
032: import java.io.*;
033: import java.util.*;
034:
035: /**
036: * A NIOSelectSource is an implementation of SourceIF which pulls events from
037: * the operating system via the NIO Selector interface. This can be thought
038: * of as a 'shim' which turns a Selector into a SourceIF.
039: *
040: * <p>NIOSelectSource also "balances" the set of events returned on
041: * subsequent calls to dequeue, in order to avoid biasing the servicing of
042: * underlying O/S events to a particular order. This feature can be
043: * disabled by creating a SelectSource with the boolean flag 'do_balance'.
044: *
045: * <p><b>Important note:</b> This class is not threadsafe with respect
046: * to multiple threads calling dequeue() or blocking_dequeue() at once.
047: * Clients must synchronize their access to this class.
048: *
049: * @author Matt Welsh
050: */
051: public class NIOSelectSource implements SelectSourceIF {
052:
053: private static final boolean DEBUG = false;
054:
055: private Selector selector;
056: private SelectionKey ready[];
057: private int ready_offset, ready_size;
058:
059: private boolean do_balance;
060: private final int BALANCER_SEQUENCE_SIZE = 10000;
061: private int balancer_seq[];
062: private int balancer_seq_off;
063: private Object blocker;
064: private String name = "(unknown)";
065:
066: // XXX MDW HACKING
067: public Selector getSelector() {
068: return selector;
069: }
070:
071: /**
072: * Create a new empty SelectSource. This SelectSource will perform
073: * event balancing.
074: */
075: public NIOSelectSource() {
076: this (true);
077: }
078:
079: /**
080: * Create a new empty SelectSource.
081: *
082: * @param do_balance Indicates whether this SelectSource should perform
083: * event balancing.
084: */
085: public NIOSelectSource(boolean do_balance) {
086: blocker = new Object();
087: try {
088: selector = Selector.open();
089: } catch (IOException e) {
090: System.err.println("NIOSelectSource (" + name
091: + "): error creating selector: " + e);
092: }
093: ready = null;
094: ready_offset = ready_size = 0;
095: this .do_balance = do_balance;
096:
097: if (DEBUG)
098: System.err.println("NIOSelectSource created, do_balance = "
099: + do_balance);
100:
101: if (do_balance)
102: initBalancer();
103: }
104:
105: /**
106: * Register a SelectItem with the SelectSource. The SelectItem should
107: * generally correspond to a Selectable along with a set of event flags
108: * that we wish this SelectSource to test for.
109: *
110: * <p>The user is allowed to modify the event flags in the SelectItem
111: * directly (say, to cause the SelectSource ignore a given SelectItem for
112: * the purposes of future calls to one of the dequeue methods). However,
113: * modifying the event flags may not be synchronous with calls to dequeue -
114: * generally because SelectSource maintains a cache of recently-received
115: * events.
116: */
117: public Object register(Object nio_sc_obj, int ops) {
118: if (DEBUG)
119: System.err.println("NIOSelectSource (" + name
120: + "): register " + nio_sc_obj + " : " + this );
121:
122: if (!(nio_sc_obj instanceof SelectableChannel)) {
123: System.err
124: .println("register() called with non SelectableChannel argument. "
125: + "Should not happen!!");
126: return null;
127: }
128:
129: SelectableChannel nio_sc = (SelectableChannel) nio_sc_obj;
130:
131: synchronized (blocker) {
132: SelectionKey ret;
133: try {
134: ret = nio_sc.register(selector, ops);
135: } catch (ClosedChannelException cce) {
136: System.err.println("Closed Channel Exception: " + cce);
137: ret = null;
138: }
139: if (DEBUG)
140: System.err.println("returning " + ret);
141: if (DEBUG)
142: System.err.println("numactive = " + numActive());
143: blocker.notify();
144: return ret;
145: }
146: }
147:
148: public void register(Object sel) {
149: System.err
150: .println("Single argument register() called on NIOSelectSource. "
151: + "Should not happen!!");
152: return;
153: }
154:
155: /**
156: * Deregister a SelectItem with this SelectSource.
157: * Note that after calling deregister, subsequent calls to dequeue
158: * may in fact return this SelectItem as a result. This is because
159: * the SelectQueue internally caches results.
160: */
161: public void deregister(Object selkey_obj) {
162: if (DEBUG)
163: System.err.println("NIOSelectSource (" + name
164: + "): deregister " + selkey_obj);
165:
166: if (!(selkey_obj instanceof SelectionKey)) {
167: System.err
168: .println("deregister() called on NIOSelectSource with non SelectionKey "
169: + "argument. Should not happen!!");
170: return;
171: }
172:
173: synchronized (blocker) {
174: SelectionKey selkey = (SelectionKey) selkey_obj;
175: selkey.cancel();
176: /* This must be done so that calls to close() actually close. */
177: try {
178: selector.selectNow();
179: } catch (IOException ioe) {
180: // Ignore
181: }
182: blocker.notify();
183: }
184: }
185:
186: /**
187: * Must be called if the 'events' mask of any SelectItem registered
188: * with this SelectSource changes. Pushes event mask changes down to
189: * the underlying event-dispatch mechanism.
190: */
191: public void update() {
192: // selset.update();
193: }
194:
195: /**
196: * Must be called if the 'events' mask of this SelectItem (which
197: * must be registered with this SelectSource) changes. Pushes
198: * event mask changes down to the underlying event-dispatch mechanism.
199: */
200: public void update(Object sel) {
201: System.err
202: .println("update() called on NIOSelectSource with argument. "
203: + "should not happen!!");
204: return;
205: }
206:
207: /**
208: * Return the number of SelectItems registered with the SelectSource.
209: */
210: public int numRegistered() {
211: return selector.keys().size();
212: }
213:
214: /**
215: * Return the number of active SelectItems registered with the SelectSource.
216: * An active SelectItem is one defined as having a non-zero events
217: * interest mask.
218: */
219: public int numActive() {
220: // does this mean number with a non-zero request mask, or number
221: // in selectedKeys()
222: Iterator key_iter = selector.keys().iterator();
223: SelectionKey sk;
224: int n_active = 0;
225: while (key_iter.hasNext()) {
226: sk = (SelectionKey) key_iter.next();
227: if (sk.isValid() && sk.interestOps() != 0)
228: n_active++;
229: }
230: return n_active;
231: }
232:
233: /**
234: * Return the number of elements waiting in the queue (that is,
235: * which don't require a SelectSet poll operation to retrieve).
236: */
237: public int size() {
238: synchronized (this ) {
239: return (ready_size - ready_offset);
240: }
241: }
242:
243: /**
244: * Dequeues the next element from the SelectSource without blocking.
245: * Returns null if no entries available.
246: */
247: public QueueElementIF dequeue() {
248: if (selector.keys().size() == 0)
249: return null;
250:
251: if ((ready_size == 0) || (ready_offset == ready_size)) {
252: doPoll(0);
253: }
254: if (ready_size == 0)
255: return null;
256: return new NIOSelectorQueueElement(ready[ready_offset++]);
257: }
258:
259: /**
260: * Dequeues all elements which are ready from the SelectSource.
261: * Returns null if no entries available.
262: */
263: public QueueElementIF[] dequeue_all() {
264: if (selector.keys().size() == 0)
265: return null;
266:
267: if ((ready_size == 0) || (ready_offset == ready_size)) {
268: doPoll(0);
269: }
270: if (ready_size == 0)
271: return null;
272: NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[ready_size
273: - ready_offset];
274: for (int i = 0; i < ret.length; i++) {
275: ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);
276: }
277: return ret;
278: }
279:
280: /**
281: * Dequeues at most <tt>num</tt> elements which are ready from the
282: * SelectSource. Returns null if no entries available.
283: */
284: public QueueElementIF[] dequeue(int num) {
285: if (selector.keys().size() == 0)
286: return null;
287:
288: if ((ready_size == 0) || (ready_offset == ready_size)) {
289: doPoll(0);
290: }
291: if (ready_size == 0)
292: return null;
293: int numtoret = Math.min(ready_size - ready_offset, num);
294:
295: NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret];
296: for (int i = 0; i < numtoret; i++) {
297: ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);
298: }
299: return ret;
300: }
301:
302: /**
303: * Dequeue the next element from the SelectSource. Blocks up to
304: * timeout_millis milliseconds; returns null if no entries available
305: * after that time. A timeout of -1 blocks forever.
306: */
307: public QueueElementIF blocking_dequeue(int timeout_millis) {
308:
309: if (DEBUG)
310: System.err.println("NIOSelectSource (" + name
311: + "): blocking_dequeue called");
312: synchronized (blocker) {
313: if (selector.keys().size() == 0) {
314: if (DEBUG)
315: System.err.println("No keys in selector");
316:
317: if (timeout_millis == 0)
318: return null;
319:
320: // Wait for something to be registered
321: if (timeout_millis == -1) {
322: try {
323: blocker.wait();
324: } catch (InterruptedException ie) {
325: }
326: } else {
327: try {
328: blocker.wait(timeout_millis);
329: } catch (InterruptedException ie) {
330: }
331: }
332: }
333: }
334:
335: if ((ready_size == 0) || (ready_offset == ready_size)) {
336: doPoll(timeout_millis);
337: }
338: if (ready_size == 0) {
339: if (DEBUG)
340: System.err.println("still no ready");
341: return null;
342: }
343: return new NIOSelectorQueueElement(ready[ready_offset++]);
344: }
345:
346: /**
347: * Dequeue a set of elements from the SelectSource. Blocks up to
348: * timeout_millis milliseconds; returns null if no entries available
349: * after that time. A timeout of -1 blocks forever.
350: */
351: public QueueElementIF[] blocking_dequeue_all(int timeout_millis) {
352: if (DEBUG)
353: System.err.println("NIOSelectSource (" + name
354: + "): blocking_dequeue_all called");
355: /* have to do this to retain same semantics as before
356: nio expects 0 for indefinite. there is no way to say
357: don't block at all, so hopefully 1ms isn't noticable to people */
358:
359: synchronized (blocker) {
360: if (selector.keys().size() == 0) {
361: if (DEBUG)
362: System.err.println("!!!!no keys");
363: if (timeout_millis == 0)
364: return null;
365: // Wait for something to be registered
366: if (timeout_millis == -1) {
367: try {
368: blocker.wait();
369: } catch (InterruptedException ie) {
370: }
371: } else {
372: try {
373: blocker.wait(timeout_millis);
374: } catch (InterruptedException ie) {
375: }
376: }
377: }
378: }
379:
380: if ((ready_size == 0) || (ready_offset == ready_size)) {
381: doPoll(timeout_millis);
382: }
383: if (DEBUG)
384: System.err.println("!!!!ready_size=" + ready_size);
385: if (ready_size == 0)
386: return null;
387: if (DEBUG)
388: System.err.println("!!!!ready_size-ready_offset="
389: + (ready_size - ready_offset));
390: NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[ready_size
391: - ready_offset];
392: for (int i = 0; i < ret.length; i++) {
393: if (DEBUG)
394: System.err.println("ret[" + i + "] = "
395: + ready[ready_offset]);
396: ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);
397: }
398: return ret;
399: }
400:
401: /**
402: * Dequeue a set of elements from the SelectSource. Blocks up to
403: * timeout_millis milliseconds; returns null if no entries available
404: * after that time. A timeout of -1 blocks forever.
405: */
406: public QueueElementIF[] blocking_dequeue(int timeout_millis, int num) {
407: if (DEBUG)
408: System.err.println("NIOSelectSource (" + name
409: + "): blocking_dequeue called");
410:
411: synchronized (blocker) {
412: if (selector.keys().size() == 0) {
413: if (timeout_millis == 0)
414: return null;
415: // Wait for something to be registered
416: if (timeout_millis == -1) {
417: try {
418: blocker.wait();
419: } catch (InterruptedException ie) {
420: }
421: } else {
422: try {
423: blocker.wait(timeout_millis);
424: } catch (InterruptedException ie) {
425: }
426: }
427: }
428: }
429:
430: if ((ready_size == 0) || (ready_offset == ready_size)) {
431: doPoll(timeout_millis);
432: }
433: if (ready_size == 0)
434: return null;
435: int numtoret = Math.min(ready_size - ready_offset, num);
436: NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret];
437: for (int i = 0; i < numtoret; i++) {
438: ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);
439: }
440: return ret;
441: }
442:
443: // Actually performs the poll and sets ready[], ready_off, ready_size
444: //
445: // XXX MDW: There is a race condition here. If multiple threads
446: // call doPoll (e.g., through dequeue()), then the ready set can
447: // get corrupted. The fix is to make this method synchronized, but
448: // this would cause a blocking dequeue() to stall all other (possibly
449: // nonblocking) dequeues until the timeout of the longest blocking
450: // dequeue. I don't see an easy way around this problem since it's
451: // the selector.selectedKeys() set that changes with each call to
452: // selector.select() or selector.selectNow(). The answer is: This class
453: // is not thread-safe!
454:
455: private void doPoll(int timeout) {
456: if (DEBUG)
457: System.err.println("NIOSelectSource (" + name
458: + "): Doing poll, timeout " + timeout);
459:
460: int c = 0;
461: try {
462: // to correct for changed semantics in nio from nbio.
463: // use selectNow to not block, and select(0) for indefinite block
464: if (timeout == 0) {
465: c = selector.selectNow();
466: } else {
467: if (timeout == -1)
468: timeout = 0;
469: c = selector.select(timeout);
470: }
471: } catch (IOException e) {
472: // Essentially ignore the exception (since NBIO SelectSet.select()
473: // doesn't throw any exceptions)
474: if (DEBUG)
475: System.err
476: .println("NIOSelectSource: Error doing select: "
477: + e);
478: }
479: if (DEBUG)
480: System.err.println("NIOSelectSource (" + name
481: + "): poll returned " + c);
482:
483: Set skeys = selector.selectedKeys();
484: if (skeys.size() > 0) {
485:
486: SelectionKey ret[] = new SelectionKey[skeys.size()];
487: Iterator key_iter = skeys.iterator();
488:
489: int j = 0;
490: while (key_iter.hasNext()) {
491: ret[j] = (SelectionKey) key_iter.next();
492: key_iter.remove();
493: //selector.selectedKeys().remove(ret[j]);
494: j++;
495: }
496:
497: if (ret.length != 0) {
498: // XXX We can't get ret == null if doPoll() is synchronized with
499: // deregister() - but I'm not sure I want to do that
500: ready_offset = 0;
501: ready_size = ret.length;
502: balance(ret);
503: return;
504: }
505: }
506: // Didn't get anything
507: ready = null;
508: ready_offset = ready_size = 0;
509: }
510:
511: // Balances selarr[] by shuffling the entries - sets ready[]
512: private void balance(SelectionKey selarr[]) {
513: if (DEBUG)
514: System.err
515: .println("NIOSelectSource (" + name
516: + "): balance called, selarr size="
517: + selarr.length);
518:
519: for (int i = 0; i < selarr.length; i++)
520: if (DEBUG)
521: System.err.println("!!!!selar[" + i + "] = "
522: + selarr[i]);
523:
524: if ((!do_balance) || (selarr.length < 2)) {
525: ready = selarr;
526: } else {
527: SelectionKey a;
528: ready = new SelectionKey[selarr.length];
529:
530: for (int i = 0; i < ready.length; i++) {
531: if (balancer_seq_off == BALANCER_SEQUENCE_SIZE) {
532: balancer_seq_off = 0;
533: }
534: int n = balancer_seq[balancer_seq_off++]
535: % selarr.length;
536: int c = 0;
537: while (selarr[n] == null) {
538: n++;
539: c++;
540: if (n == selarr.length)
541: n = 0;
542: if (c == selarr.length) {
543: System.err
544: .println("WARNING: NIOSelectSource.balance(): All items in selarr are null (n="
545: + n
546: + ", c="
547: + c
548: + ", len="
549: + selarr.length);
550: for (int k = 0; k < ready.length; k++) {
551: System.err
552: .println("[" + k + "] ready:"
553: + ready[k] + " selarr:"
554: + selarr[k]);
555: }
556: throw new IllegalArgumentException(
557: "balance: All items in selarr are null! This is a bug - please contact mdw@cs.berkeley.edu");
558: }
559: }
560: if (DEBUG)
561: System.err.println("NIOSelectSource: balance: " + n
562: + "->" + i);
563: a = selarr[n];
564: selarr[n] = null;
565: ready[i] = a;
566: }
567: }
568: }
569:
570: // Initialize the balancer
571: private void initBalancer() {
572: balancer_seq = new int[BALANCER_SEQUENCE_SIZE];
573: Random r = new Random(); // XXX Need better seed?
574: for (int i = 0; i < BALANCER_SEQUENCE_SIZE; i++) {
575: balancer_seq[i] = Math.abs(r.nextInt());
576: }
577: balancer_seq_off = 0;
578: }
579:
580: void setName(String thename) {
581: this .name = thename;
582: }
583:
584: public String toString() {
585: return "NIOSS(" + name + ")";
586: }
587:
588: }
|