001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket.nbio;
026:
027: import seda.nbio.*;
028: import seda.sandStorm.api.*;
029: import seda.sandStorm.lib.aSocket.*;
030:
031: import java.net.*;
032: import java.io.*;
033: import java.util.*;
034:
035: /**
036: * A SelectSource is an implementation of SourceIF which pulls events from
037: * the operating system via the NBIO SelectSet interface. This can be thought
038: * of as a 'shim' which turns a SelectSet into a SourceIF.
039: *
040: * <p>SelectSource can also "balances" the set of events returned on
041: * subsequent calls to dequeue, in order to avoid biasing the servicing of
042: * underlying O/S events to a particular order. This feature can be
043: * disabled by creating a SelectSource with the boolean flag 'do_balance'.
044: *
045: * @author Matt Welsh
046: */
047: public class SelectSource implements SelectSourceIF {
048:
049: private static final boolean DEBUG = false;
050:
051: private SelectSet selset;
052: private SelectItem ready[];
053: private int ready_offset, ready_size;
054:
055: private boolean do_balance;
056: private final int BALANCER_SEQUENCE_SIZE = 10000;
057: private int balancer_seq[];
058: private int balancer_seq_off;
059: private Object blocker;
060:
061: // XXX MDW HACKING
062: public Object getSelectSet() {
063: return selset;
064: }
065:
066: /**
067: * Create a new empty SelectSource. This SelectSource will perform
068: * event balancing.
069: */
070: public SelectSource() {
071: this (true);
072: }
073:
074: /**
075: * Create a new empty SelectSource.
076: *
077: * @param do_balance Indicates whether this SelectSource should perform
078: * event balancing.
079: */
080: public SelectSource(boolean do_balance) {
081: blocker = new Object();
082: selset = new SelectSet();
083: ready = null;
084: ready_offset = ready_size = 0;
085: this .do_balance = do_balance;
086:
087: if (DEBUG)
088: System.err.println("SelectSource created, do_balance = "
089: + do_balance);
090:
091: if (do_balance)
092: initBalancer();
093: }
094:
095: /**
096: * Register a SelectItem with the SelectSource. The SelectItem should
097: * generally correspond to a Selectable along with a set of event flags
098: * that we wish this SelectSource to test for.
099: *
100: * <p>The user is allowed to modify the event flags in the SelectItem
101: * directly (say, to cause the SelectSource ignore a given SelectItem for
102: * the purposes of future calls to one of the dequeue methods). However,
103: * modifying the event flags may not be synchronous with calls to dequeue -
104: * generally because SelectSource maintains a cache of recently-received
105: * events.
106: *
107: * @see seda.nbio.Selectable
108: */
109: public void register(Object selobj) {
110: if (DEBUG)
111: System.err.println("SelectSource: register " + selobj);
112: if (!(selobj instanceof SelectItem)) {
113: System.err
114: .println("register() called with non SelectItem argument. "
115: + "Should not happen!!");
116: return;
117: }
118: SelectItem sel = (SelectItem) selobj;
119: selset.add(sel);
120: synchronized (blocker) {
121: blocker.notify();
122: }
123: }
124:
125: public Object register(Object sc_obj, int ops) {
126: System.err
127: .println("Double argument register() called on nbio SelectSource. "
128: + "Should not happen!!");
129: return null;
130: }
131:
132: /**
133: * Deregister a SelectItem with this SelectSource.
134: * Note that after calling deregister, subsequent calls to dequeue
135: * may in fact return this SelectItem as a result. This is because
136: * the SelectQueue internally caches results.
137: */
138: public void deregister(Object selobj) {
139: if (DEBUG)
140: System.err.println("SelectSource: deregister " + selobj);
141: if (!(selobj instanceof SelectItem)) {
142: System.err
143: .println("deregister() called with non SelectItem argument. "
144: + "Should not happen!!");
145: return;
146: }
147: SelectItem sel = (SelectItem) selobj;
148: selset.remove(sel);
149: synchronized (blocker) {
150: blocker.notify();
151: }
152: }
153:
154: /**
155: * Must be called if the 'events' mask of any SelectItem registered
156: * with this SelectSource changes. Pushes event mask changes down to
157: * the underlying event-dispatch mechanism.
158: */
159: public void update() {
160: selset.update();
161: }
162:
163: /**
164: * Must be called if the 'events' mask of this SelectItem (which
165: * must be registered with this SelectSource) changes. Pushes
166: * event mask changes down to the underlying event-dispatch mechanism.
167: */
168: public void update(Object selobj) {
169: if (!(selobj instanceof SelectItem)) {
170: System.err
171: .println("deregister() called with non SelectItem argument. "
172: + "Should not happen!!");
173: return;
174: }
175: selset.update((SelectItem) selobj);
176: }
177:
178: /**
179: * Return the number of SelectItems registered with the SelectSource.
180: */
181: public int numRegistered() {
182: return selset.size();
183: }
184:
185: /**
186: * Return the number of active SelectItems registered with the SelectSource.
187: * An active SelectItem is one defined as having a non-zero events
188: * interest mask.
189: */
190: public int numActive() {
191: return selset.numActive();
192: }
193:
194: /**
195: * Return the number of elements waiting in the queue (that is,
196: * which don't require a SelectSet poll operation to retrieve).
197: */
198: public int size() {
199: return (ready_size - ready_offset);
200: }
201:
202: /**
203: * Dequeues the next element from the SelectSource without blocking.
204: * Returns null if no entries available.
205: */
206: public QueueElementIF dequeue() {
207: if (selset.size() == 0)
208: return null;
209:
210: if ((ready_size == 0) || (ready_offset == ready_size)) {
211: doPoll(0);
212: }
213: if (ready_size == 0)
214: return null;
215: return new SelectQueueElement(ready[ready_offset++]);
216: }
217:
218: /**
219: * Dequeues all elements which are ready from the SelectSource.
220: * Returns null if no entries available.
221: */
222: public QueueElementIF[] dequeue_all() {
223: if (selset.size() == 0)
224: return null;
225:
226: if ((ready_size == 0) || (ready_offset == ready_size)) {
227: doPoll(0);
228: }
229: if (ready_size == 0)
230: return null;
231: SelectQueueElement ret[] = new SelectQueueElement[ready_size
232: - ready_offset];
233: for (int i = 0; i < ret.length; i++) {
234: ret[i] = new SelectQueueElement(ready[ready_offset++]);
235: }
236: return ret;
237: }
238:
239: /**
240: * Dequeues at most <tt>num</tt> elements which are ready from the
241: * SelectSource. Returns null if no entries available.
242: */
243: public QueueElementIF[] dequeue(int num) {
244: if (selset.size() == 0)
245: return null;
246:
247: if ((ready_size == 0) || (ready_offset == ready_size)) {
248: doPoll(0);
249: }
250: if (ready_size == 0)
251: return null;
252: int numtoret = Math.min(ready_size - ready_offset, num);
253:
254: SelectQueueElement ret[] = new SelectQueueElement[numtoret];
255: for (int i = 0; i < numtoret; i++) {
256: ret[i] = new SelectQueueElement(ready[ready_offset++]);
257: }
258: return ret;
259: }
260:
261: /**
262: * Dequeue the next element from the SelectSource. Blocks up to
263: * timeout_millis milliseconds; returns null if no entries available
264: * after that time. A timeout of -1 blocks forever.
265: */
266: public QueueElementIF blocking_dequeue(int timeout_millis) {
267:
268: if (selset.size() == 0) {
269: if (timeout_millis == 0)
270: return null;
271: // Wait for something to be registered
272: synchronized (blocker) {
273: if (timeout_millis == -1) {
274: try {
275: blocker.wait();
276: } catch (InterruptedException ie) {
277: }
278: } else {
279: try {
280: blocker.wait(timeout_millis);
281: } catch (InterruptedException ie) {
282: }
283: }
284: }
285: }
286:
287: if ((ready_size == 0) || (ready_offset == ready_size)) {
288: doPoll(timeout_millis);
289: }
290: if (ready_size == 0)
291: return null;
292: return new SelectQueueElement(ready[ready_offset++]);
293: }
294:
295: /**
296: * Dequeue a set of elements from the SelectSource. Blocks up to
297: * timeout_millis milliseconds; returns null if no entries available
298: * after that time. A timeout of -1 blocks forever.
299: */
300: public QueueElementIF[] blocking_dequeue_all(int timeout_millis) {
301:
302: if (selset.size() == 0) {
303: if (timeout_millis == 0)
304: return null;
305: // Wait for something to be registered
306: synchronized (blocker) {
307: if (timeout_millis == -1) {
308: try {
309: blocker.wait();
310: } catch (InterruptedException ie) {
311: }
312: } else {
313: try {
314: blocker.wait(timeout_millis);
315: } catch (InterruptedException ie) {
316: }
317: }
318: }
319: }
320:
321: if ((ready_size == 0) || (ready_offset == ready_size)) {
322: doPoll(timeout_millis);
323: }
324: if (ready_size == 0)
325: return null;
326: SelectQueueElement ret[] = new SelectQueueElement[ready_size
327: - ready_offset];
328: for (int i = 0; i < ret.length; i++) {
329: ret[i] = new SelectQueueElement(ready[ready_offset++]);
330: }
331: return ret;
332: }
333:
334: /**
335: * Dequeue a set of elements from the SelectSource. Blocks up to
336: * timeout_millis milliseconds; returns null if no entries available
337: * after that time. A timeout of -1 blocks forever.
338: */
339: public QueueElementIF[] blocking_dequeue(int timeout_millis, int num) {
340:
341: if (selset.size() == 0) {
342: if (timeout_millis == 0)
343: return null;
344: // Wait for something to be registered
345: synchronized (blocker) {
346: if (timeout_millis == -1) {
347: try {
348: blocker.wait();
349: } catch (InterruptedException ie) {
350: }
351: } else {
352: try {
353: blocker.wait(timeout_millis);
354: } catch (InterruptedException ie) {
355: }
356: }
357: }
358: }
359:
360: if ((ready_size == 0) || (ready_offset == ready_size)) {
361: doPoll(timeout_millis);
362: }
363: if (ready_size == 0)
364: return null;
365: int numtoret = Math.min(ready_size - ready_offset, num);
366: SelectQueueElement ret[] = new SelectQueueElement[numtoret];
367: for (int i = 0; i < numtoret; i++) {
368: ret[i] = new SelectQueueElement(ready[ready_offset++]);
369: }
370: return ret;
371: }
372:
373: // Actually performs the poll and sets ready[], ready_off, ready_size
374: private void doPoll(int timeout) {
375: if (DEBUG)
376: System.err.println("SelectSource: Doing poll, timeout "
377: + timeout);
378: int c = selset.select(timeout);
379: if (DEBUG)
380: System.err.println("SelectSource: poll returned " + c);
381: if (c > 0) {
382: SelectItem ret[] = selset.getEvents();
383: if (ret != null) {
384: // XXX We can't get ret == null if doPoll() is synchronized with
385: // deregister() - but I'm not sure I want to do that
386: ready_offset = 0;
387: ready_size = ret.length;
388: balance(ret);
389: return;
390: }
391: }
392: // Didn't get anything
393: ready = null;
394: ready_offset = ready_size = 0;
395: }
396:
397: // Balances selarr[] by shuffling the entries - sets ready[]
398: private void balance(SelectItem selarr[]) {
399: if (DEBUG)
400: System.err
401: .println("SelectSource: balance called, selarr size="
402: + selarr.length);
403: if ((!do_balance) || (selarr.length < 2)) {
404: ready = selarr;
405: } else {
406: SelectItem a;
407: ready = new SelectItem[selarr.length];
408:
409: for (int i = 0; i < ready.length; i++) {
410: if (balancer_seq_off == BALANCER_SEQUENCE_SIZE) {
411: balancer_seq_off = 0;
412: }
413: int n = balancer_seq[balancer_seq_off++]
414: % selarr.length;
415: int c = 0;
416: while (selarr[n] == null) {
417: n++;
418: c++;
419: if (n == selarr.length)
420: n = 0;
421: if (c == selarr.length) {
422: System.err
423: .println("WARNING: SelectSource.balance(): All items in selarr are null (n="
424: + n
425: + ", c="
426: + c
427: + ", len="
428: + selarr.length);
429: for (int k = 0; k < ready.length; k++) {
430: System.err
431: .println("[" + k + "] ready:"
432: + ready[k] + " selarr:"
433: + selarr[k]);
434: }
435: throw new IllegalArgumentException(
436: "balance: All items in selarr are null! This is a bug - please contact mdw@cs.berkeley.edu");
437: }
438: }
439: if (DEBUG)
440: System.err.println("SelectSource: balance: " + n
441: + "->" + i);
442: a = selarr[n];
443: selarr[n] = null;
444: ready[i] = a;
445: }
446: }
447: }
448:
449: // Initialize the balancer
450: private void initBalancer() {
451: balancer_seq = new int[BALANCER_SEQUENCE_SIZE];
452: Random r = new Random(); // XXX Need better seed?
453: for (int i = 0; i < BALANCER_SEQUENCE_SIZE; i++) {
454: balancer_seq[i] = Math.abs(r.nextInt());
455: }
456: balancer_seq_off = 0;
457: }
458:
459: }
|