001: /*
002: * Copyright 2002-2007 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025:
026: /*
027: * @(#)WindowsSelectorImpl.java 1.18 07/05/05
028: */
029:
030: package sun.nio.ch;
031:
032: import java.nio.channels.spi.SelectorProvider;
033: import java.nio.channels.Selector;
034: import java.nio.channels.ClosedSelectorException;
035: import java.nio.channels.Pipe;
036: import java.nio.channels.SelectableChannel;
037: import java.nio.channels.SelectionKey;
038: import java.io.IOException;
039: import java.util.List;
040: import java.util.ArrayList;
041: import java.util.HashMap;
042: import java.util.Iterator;
043:
044: /**
045: * A multi-threaded implementation of Selector for Windows.
046: *
047: * @author Konstantin Kladko
048: * @author Mark Reinhold
049: * @version 1.18, 05/05/07
050: */
051:
052: final class WindowsSelectorImpl extends SelectorImpl {
053: // Initial capacity of the poll array
054: private final int INIT_CAP = 8;
055: // Maximum number of sockets for select().
056: // Should be INIT_CAP times a power of 2
057: private final static int MAX_SELECTABLE_FDS = 1024;
058:
059: // The list of SelectableChannels serviced by this Selector. Every mod
060: // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
061: // array, where the corresponding entry is occupied by the wakeupSocket
062: private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
063:
064: // The global native poll array holds file decriptors and event masks
065: private PollArrayWrapper pollWrapper;
066:
067: // The number of valid entries in poll array, including entries occupied
068: // by wakeup socket handle.
069: private int totalChannels = 1;
070:
071: // Number of helper threads needed for select. We need one thread per
072: // each additional set of MAX_SELECTABLE_FDS - 1 channels.
073: private int threadsCount = 0;
074:
075: // A list of helper threads for select.
076: private final List threads = new ArrayList();
077:
078: //Pipe used as a wakeup object.
079: private final Pipe wakeupPipe;
080:
081: // File descriptors corresponding to source and sink
082: private final int wakeupSourceFd, wakeupSinkFd;
083:
084: // Maps file descriptors to their indices in pollArray
085: private final static class FdMap extends HashMap<Integer, MapEntry> {
086: private MapEntry get(int desc) {
087: return get(new Integer(desc));
088: }
089:
090: private MapEntry put(SelectionKeyImpl ski) {
091: return put(new Integer(ski.channel.getFDVal()),
092: new MapEntry(ski));
093: }
094:
095: private MapEntry remove(SelectionKeyImpl ski) {
096: Integer fd = new Integer(ski.channel.getFDVal());
097: MapEntry x = get(fd);
098: if ((x != null) && (x.ski.channel == ski.channel))
099: return remove(fd);
100: return null;
101: }
102: }
103:
104: // class for fdMap entries
105: private final static class MapEntry {
106: SelectionKeyImpl ski;
107: long updateCount = 0;
108: long clearedCount = 0;
109:
110: MapEntry(SelectionKeyImpl ski) {
111: this .ski = ski;
112: }
113: }
114:
115: private final FdMap fdMap = new FdMap();
116:
117: // SubSelector for the main thread
118: private final SubSelector subSelector = new SubSelector();
119:
120: private long timeout; //timeout for poll
121:
122: // Lock for interrupt triggering and clearing
123: private final Object interruptLock = new Object();
124: private volatile boolean interruptTriggered = false;
125:
126: WindowsSelectorImpl(SelectorProvider sp) throws IOException {
127: super (sp);
128: pollWrapper = new PollArrayWrapper(INIT_CAP);
129: wakeupPipe = Pipe.open();
130: wakeupSourceFd = ((SelChImpl) wakeupPipe.source()).getFDVal();
131:
132: // Disable the Nagle algorithm so that the wakeup is more immediate
133: SinkChannelImpl sink = (SinkChannelImpl) wakeupPipe.sink();
134: (sink.sc).socket().setTcpNoDelay(true);
135: wakeupSinkFd = ((SelChImpl) sink).getFDVal();
136:
137: pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
138: }
139:
140: protected int doSelect(long timeout) throws IOException {
141: if (channelArray == null)
142: throw new ClosedSelectorException();
143: this .timeout = timeout; // set selector timeout
144: processDeregisterQueue();
145: if (interruptTriggered) {
146: resetWakeupSocket();
147: return 0;
148: }
149: // Calculate number of helper threads needed for poll. If necessary
150: // threads are created here and start waiting on startLock
151: adjustThreadsCount();
152: finishLock.reset(); // reset finishLock
153: // Wakeup helper threads, waiting on startLock, so they start polling.
154: // Redundant threads will exit here after wakeup.
155: startLock.startThreads();
156: // do polling in the main thread. Main thread is responsible for
157: // first MAX_SELECTABLE_FDS entries in pollArray.
158: try {
159: begin();
160: try {
161: subSelector.poll();
162: } catch (IOException e) {
163: finishLock.setException(e); // Save this exception
164: }
165: // Main thread is out of poll(). Wakeup others and wait for them
166: if (threads.size() > 0)
167: finishLock.waitForHelperThreads();
168: } finally {
169: end();
170: }
171: // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
172: finishLock.checkForException();
173: processDeregisterQueue();
174: int updated = updateSelectedKeys();
175: // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
176: resetWakeupSocket();
177: return updated;
178: }
179:
180: // Helper threads wait on this lock for the next poll.
181: private final StartLock startLock = new StartLock();
182:
183: private final class StartLock {
184: // A variable which distinguishes the current run of doSelect from the
185: // previous one. Incrementing runsCounter and notifying threads will
186: // trigger another round of poll.
187: private long runsCounter;
188:
189: // Triggers threads, waiting on this lock to start polling.
190: private synchronized void startThreads() {
191: runsCounter++; // next run
192: notifyAll(); // wake up threads.
193: }
194:
195: // This function is called by a helper thread to wait for the
196: // next round of poll(). It also checks, if this thread became
197: // redundant. If yes, it returns true, notifying the thread
198: // that it should exit.
199: private synchronized boolean waitForStart(SelectThread thread) {
200: while (true) {
201: while (runsCounter == thread.lastRun) {
202: try {
203: startLock.wait();
204: } catch (InterruptedException e) {
205: Thread.currentThread().interrupt();
206: }
207: }
208: if (thread.index >= threads.size()) { // redundant thread
209: return true; // will cause run() to exit.
210: } else {
211: thread.lastRun = runsCounter; // update lastRun
212: return false; // will cause run() to poll.
213: }
214: }
215: }
216: }
217:
218: // Main thread waits on this lock, until all helper threads are done
219: // with poll().
220: private final FinishLock finishLock = new FinishLock();
221:
222: private final class FinishLock {
223: // Number of helper threads, that did not finish yet.
224: private int threadsToFinish;
225:
226: // IOException which occured during the last run.
227: IOException exception = null;
228:
229: // Called before polling.
230: private void reset() {
231: threadsToFinish = threads.size(); // helper threads
232: }
233:
234: // Each helper thread invokes this function on finishLock, when
235: // the thread is done with poll().
236: private synchronized void threadFinished() {
237: if (threadsToFinish == threads.size()) { // finished poll() first
238: // if finished first, wakeup others
239: wakeup();
240: }
241: threadsToFinish--;
242: if (threadsToFinish == 0) // all helper threads finished poll().
243: notify(); // notify the main thread
244: }
245:
246: // The main thread invokes this function on finishLock to wait
247: // for helper threads to finish poll().
248: private synchronized void waitForHelperThreads() {
249: if (threadsToFinish == threads.size()) {
250: // no helper threads finished yet. Wakeup them up.
251: wakeup();
252: }
253: while (threadsToFinish != 0) {
254: try {
255: finishLock.wait();
256: } catch (InterruptedException e) {
257: // Interrupted - set interrupted state.
258: Thread.currentThread().interrupt();
259: }
260: }
261: }
262:
263: // sets IOException for this run
264: private synchronized void setException(IOException e) {
265: exception = e;
266: }
267:
268: // Checks if there was any exception during the last run.
269: // If yes, throws it
270: private void checkForException() throws IOException {
271: if (exception == null)
272: return;
273: StringBuffer message = new StringBuffer(
274: "An exception occured"
275: + " during the execution of select(): \n");
276: message.append(exception);
277: message.append('\n');
278: exception = null;
279: throw new IOException(message.toString());
280: }
281: }
282:
283: private final class SubSelector {
284: private final int pollArrayIndex; // starting index in pollArray to poll
285: // These arrays will hold result of native select().
286: // The first element of each array is the number of selected sockets.
287: // Other elements are file descriptors of selected sockets.
288: private final int[] readFds = new int[MAX_SELECTABLE_FDS + 1];
289: private final int[] writeFds = new int[MAX_SELECTABLE_FDS + 1];
290: private final int[] exceptFds = new int[MAX_SELECTABLE_FDS + 1];
291:
292: private SubSelector() {
293: this .pollArrayIndex = 0; // main thread
294: }
295:
296: private SubSelector(int threadIndex) { // helper threads
297: this .pollArrayIndex = (threadIndex + 1)
298: * MAX_SELECTABLE_FDS;
299: }
300:
301: private int poll() throws IOException { // poll for the main thread
302: return poll0(pollWrapper.pollArrayAddress, Math.min(
303: totalChannels, MAX_SELECTABLE_FDS), readFds,
304: writeFds, exceptFds, timeout);
305: }
306:
307: private int poll(int index) throws IOException {
308: // poll for helper threads
309: return poll0(pollWrapper.pollArrayAddress
310: + (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
311: Math.min(MAX_SELECTABLE_FDS, totalChannels
312: - (index + 1) * MAX_SELECTABLE_FDS),
313: readFds, writeFds, exceptFds, timeout);
314: }
315:
316: private native int poll0(long pollAddress, int numfds,
317: int[] readFds, int[] writeFds, int[] exceptFds,
318: long timeout);
319:
320: private int processSelectedKeys(long updateCount) {
321: int numKeysUpdated = 0;
322: numKeysUpdated += processFDSet(updateCount, readFds,
323: PollArrayWrapper.POLLIN);
324: numKeysUpdated += processFDSet(updateCount, writeFds,
325: PollArrayWrapper.POLLCONN
326: | PollArrayWrapper.POLLOUT);
327: numKeysUpdated += processFDSet(updateCount, exceptFds,
328: PollArrayWrapper.POLLIN | PollArrayWrapper.POLLCONN
329: | PollArrayWrapper.POLLOUT);
330: return numKeysUpdated;
331: }
332:
333: /**
334: * Note, clearedCount is used to determine if the readyOps have
335: * been reset in this select operation. updateCount is used to
336: * tell if a key has been counted as updated in this select
337: * operation.
338: *
339: * me.updateCount <= me.clearedCount <= updateCount
340: */
341: private int processFDSet(long updateCount, int[] fds, int rOps) {
342: int numKeysUpdated = 0;
343: for (int i = 1; i <= fds[0]; i++) {
344: int desc = fds[i];
345: if (desc == wakeupSourceFd) {
346: synchronized (interruptLock) {
347: interruptTriggered = true;
348: }
349: continue;
350: }
351: MapEntry me = fdMap.get(desc);
352: // If me is null, the key was deregistered in the previous
353: // processDeregisterQueue.
354: if (me == null)
355: continue;
356: SelectionKeyImpl sk = me.ski;
357: if (selectedKeys.contains(sk)) { // Key in selected set
358: if (me.clearedCount != updateCount) {
359: if (sk.channel
360: .translateAndSetReadyOps(rOps, sk)
361: && (me.updateCount != updateCount)) {
362: me.updateCount = updateCount;
363: numKeysUpdated++;
364: }
365: } else { // The readyOps have been set; now add
366: if (sk.channel.translateAndUpdateReadyOps(rOps,
367: sk)
368: && (me.updateCount != updateCount)) {
369: me.updateCount = updateCount;
370: numKeysUpdated++;
371: }
372: }
373: me.clearedCount = updateCount;
374: } else { // Key is not in selected set yet
375: if (me.clearedCount != updateCount) {
376: sk.channel.translateAndSetReadyOps(rOps, sk);
377: if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
378: selectedKeys.add(sk);
379: me.updateCount = updateCount;
380: numKeysUpdated++;
381: }
382: } else { // The readyOps have been set; now add
383: sk.channel.translateAndUpdateReadyOps(rOps, sk);
384: if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
385: selectedKeys.add(sk);
386: me.updateCount = updateCount;
387: numKeysUpdated++;
388: }
389: }
390: me.clearedCount = updateCount;
391: }
392: }
393: return numKeysUpdated;
394: }
395: }
396:
397: // Represents a helper thread used for select.
398: private final class SelectThread extends Thread {
399: private int index; // index of this thread
400: SubSelector subSelector;
401: private long lastRun = 0; // last run number
402:
403: // Creates a new thread
404: private SelectThread(int i) {
405: this .index = i;
406: this .subSelector = new SubSelector(i);
407: //make sure we wait for next round of poll
408: this .lastRun = startLock.runsCounter;
409: }
410:
411: public void run() {
412: while (true) { // poll loop
413: // wait for the start of poll. If this thread has become
414: // redundant, then exit.
415: if (startLock.waitForStart(this ))
416: return;
417: // call poll()
418: try {
419: subSelector.poll(index);
420: } catch (IOException e) {
421: // Save this exception and let other threads finish.
422: finishLock.setException(e);
423: }
424: // notify main thread, that this thread has finished, and
425: // wakeup others, if this thread is the first to finish.
426: finishLock.threadFinished();
427: }
428: }
429: }
430:
431: // After some channels registered/deregistered, the number of required
432: // helper threads may have changed. Adjust this number.
433: private void adjustThreadsCount() {
434: if (threadsCount > threads.size()) {
435: // More threads needed. Start more threads.
436: for (int i = threads.size(); i < threadsCount; i++) {
437: SelectThread newThread = new SelectThread(i);
438: threads.add(newThread);
439: newThread.setDaemon(true);
440: newThread.start();
441: }
442: } else if (threadsCount < threads.size()) {
443: // Some threads become redundant. Remove them from the threads List.
444: for (int i = threads.size() - 1; i >= threadsCount; i--)
445: threads.remove(i);
446: }
447: }
448:
449: // Sets Windows wakeup socket to a signaled state.
450: private void setWakeupSocket() {
451: setWakeupSocket0(wakeupSinkFd);
452: }
453:
454: private native void setWakeupSocket0(int wakeupSinkFd);
455:
456: // Sets Windows wakeup socket to a non-signaled state.
457: private void resetWakeupSocket() {
458: synchronized (interruptLock) {
459: if (interruptTriggered == false)
460: return;
461: resetWakeupSocket0(wakeupSourceFd);
462: interruptTriggered = false;
463: }
464: }
465:
466: private native void resetWakeupSocket0(int wakeupSourceFd);
467:
468: // We increment this counter on each call to updateSelectedKeys()
469: // each entry in SubSelector.fdsMap has a memorized value of
470: // updateCount. When we increment numKeysUpdated we set updateCount
471: // for the corresponding entry to its current value. This is used to
472: // avoid counting the same key more than once - the same key can
473: // appear in readfds and writefds.
474: private long updateCount = 0;
475:
476: // Update ops of the corresponding Channels. Add the ready keys to the
477: // ready queue.
478: private int updateSelectedKeys() {
479: updateCount++;
480: int numKeysUpdated = 0;
481: numKeysUpdated += subSelector.processSelectedKeys(updateCount);
482: Iterator it = threads.iterator();
483: while (it.hasNext())
484: numKeysUpdated += ((SelectThread) it.next()).subSelector
485: .processSelectedKeys(updateCount);
486: return numKeysUpdated;
487: }
488:
489: protected void implClose() throws IOException {
490: if (channelArray != null) {
491: if (pollWrapper != null) {
492: // prevent further wakeup
493: synchronized (interruptLock) {
494: interruptTriggered = true;
495: }
496: wakeupPipe.sink().close();
497: wakeupPipe.source().close();
498: for (int i = 1; i < totalChannels; i++) { // Deregister channels
499: if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
500: deregister(channelArray[i]);
501: SelectableChannel selch = channelArray[i]
502: .channel();
503: if (!selch.isOpen() && !selch.isRegistered())
504: ((SelChImpl) selch).kill();
505: }
506: }
507: pollWrapper.free();
508: pollWrapper = null;
509: selectedKeys = null;
510: channelArray = null;
511: threads.clear();
512: // Call startThreads. All remaining helper threads now exit,
513: // since threads.size() = 0;
514: startLock.startThreads();
515: }
516: }
517: }
518:
519: protected void implRegister(SelectionKeyImpl ski) {
520: growIfNeeded();
521: channelArray[totalChannels] = ski;
522: ski.setIndex(totalChannels);
523: fdMap.put(ski);
524: keys.add(ski);
525: pollWrapper.addEntry(totalChannels, ski);
526: totalChannels++;
527: }
528:
529: private void growIfNeeded() {
530: if (channelArray.length == totalChannels) {
531: int newSize = totalChannels * 2; // Make a larger array
532: SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
533: System.arraycopy(channelArray, 1, temp, 1,
534: totalChannels - 1);
535: channelArray = temp;
536: pollWrapper.grow(newSize);
537: }
538: if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
539: pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
540: totalChannels++;
541: threadsCount++;
542: }
543: }
544:
545: protected void implDereg(SelectionKeyImpl ski) throws IOException {
546: int i = ski.getIndex();
547: assert (i >= 0);
548: if (i != totalChannels - 1) {
549: // Copy end one over it
550: SelectionKeyImpl endChannel = channelArray[totalChannels - 1];
551: channelArray[i] = endChannel;
552: endChannel.setIndex(i);
553: pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
554: pollWrapper, i);
555: }
556: channelArray[totalChannels - 1] = null;
557: totalChannels--;
558: ski.setIndex(-1);
559: if (totalChannels != 1
560: && totalChannels % MAX_SELECTABLE_FDS == 1) {
561: totalChannels--;
562: threadsCount--; // The last thread has become redundant.
563: }
564: fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
565: keys.remove(ski);
566: selectedKeys.remove(ski);
567: deregister(ski);
568: SelectableChannel selch = ski.channel();
569: if (!selch.isOpen() && !selch.isRegistered())
570: ((SelChImpl) selch).kill();
571: }
572:
573: void putEventOps(SelectionKeyImpl sk, int ops) {
574: pollWrapper.putEventOps(sk.getIndex(), ops);
575: }
576:
577: public Selector wakeup() {
578: synchronized (interruptLock) {
579: if (!interruptTriggered) {
580: setWakeupSocket();
581: interruptTriggered = true;
582: }
583: }
584: return this ;
585: }
586:
587: static {
588: Util.load();
589: }
590: }
|