001: /* Licensed to the Apache Software Foundation (ASF) under one or more
002: * contributor license agreements. See the NOTICE file distributed with
003: * this work for additional information regarding copyright ownership.
004: * The ASF licenses this file to You under the Apache License, Version 2.0
005: * (the "License"); you may not use this file except in compliance with
006: * the License. You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.harmony.nio.internal;
017:
018: import java.io.FileDescriptor;
019: import java.io.IOException;
020: import java.nio.ByteBuffer;
021: import java.nio.channels.ClosedSelectorException;
022: import java.nio.channels.IllegalSelectorException;
023: import java.nio.channels.Pipe;
024: import java.nio.channels.SelectableChannel;
025: import java.nio.channels.SelectionKey;
026: import java.nio.channels.Selector;
027: import java.nio.channels.SocketChannel;
028: import java.nio.channels.spi.AbstractSelectableChannel;
029: import java.nio.channels.spi.AbstractSelectionKey;
030: import java.nio.channels.spi.AbstractSelector;
031: import java.nio.channels.spi.SelectorProvider;
032: import java.util.Arrays;
033: import java.util.Collection;
034: import java.util.Collections;
035: import java.util.HashSet;
036: import java.util.Iterator;
037: import java.util.Set;
038:
039: import org.apache.harmony.luni.platform.FileDescriptorHandler;
040: import org.apache.harmony.luni.platform.Platform;
041:
042: /*
043: * Default implementation of java.nio.channels.Selector
044: */
045: final class SelectorImpl extends AbstractSelector {
046:
047: private static final int MOCK_WRITEBUF_SIZE = 1;
048:
049: private static final int MOCK_READBUF_SIZE = 8;
050:
051: private static final int NA = 0;
052:
053: private static final int READABLE = 1;
054:
055: private static final int WRITEABLE = 2;
056:
057: private static final int SELECT_BLOCK = -1;
058:
059: private static final int SELECT_NOW = 0;
060:
061: /*
062: * keysLock is used to brief synchronization when get selectionKeys snapshot
063: * before selection.
064: */
065: private static class KeysLock {
066: }
067:
068: final Object keysLock = new KeysLock();
069:
070: private SelectionKey[] keys = new SelectionKey[1];
071:
072: private final Set<SelectionKey> keysSet = new HashSet<SelectionKey>();
073:
074: private Set<SelectionKey> unmodifiableKeys = Collections
075: .unmodifiableSet(keysSet);
076:
077: private final Set<SelectionKey> selectedKeys = new HashSet<SelectionKey>();
078:
079: private Set<SelectionKey> unaddableSelectedKeys = new UnaddableSet<SelectionKey>(
080: selectedKeys);
081:
082: // sink and source are used by wakeup()
083: private Pipe.SinkChannel sink;
084:
085: private Pipe.SourceChannel source;
086:
087: private FileDescriptor sourcefd;
088:
089: private FileDescriptor[] readableFDs;
090:
091: private FileDescriptor[] writableFDs;
092:
093: private int lastKeyIndex = -1;
094:
095: private int readableKeysCount = 0;
096:
097: private int writableKeysCount = 0;
098:
099: private int[] keysToReadableFDs;
100:
101: private int[] keysToWritableFDs;
102:
103: private int[] readableFDsToKeys;
104:
105: private int[] writableFDsToKeys;
106:
107: public SelectorImpl(SelectorProvider selectorProvider) {
108: super (selectorProvider);
109: try {
110: Pipe mockSelector = selectorProvider.openPipe();
111: sink = mockSelector.sink();
112: source = mockSelector.source();
113: sourcefd = ((FileDescriptorHandler) source).getFD();
114: source.configureBlocking(false);
115:
116: readableFDs = new FileDescriptor[1];
117: writableFDs = new FileDescriptor[0];
118: keysToReadableFDs = new int[1];
119: keysToWritableFDs = new int[0];
120: readableFDsToKeys = new int[1];
121: writableFDsToKeys = new int[0];
122:
123: // register sink channel
124: readableFDs[0] = sourcefd;
125: keys[0] = source.keyFor(this );
126:
127: // index it
128: keysToReadableFDs[0] = 0;
129: readableFDsToKeys[0] = 0;
130:
131: lastKeyIndex = 0;
132: readableKeysCount = 1;
133: } catch (IOException e) {
134: // do nothing
135: }
136: }
137:
138: /**
139: * @see java.nio.channels.spi.AbstractSelector#implCloseSelector()
140: */
141: @Override
142: protected void implCloseSelector() throws IOException {
143: synchronized (this ) {
144: synchronized (keysSet) {
145: synchronized (selectedKeys) {
146: doCancel();
147: for (SelectionKey sk : keys) {
148: if (sk != null) {
149: deregister((AbstractSelectionKey) sk);
150: }
151: }
152: wakeup();
153: }
154: }
155: }
156: }
157:
158: private void ensureCapacity(int c) {
159: // TODO: rewrite array handling as some internal class
160: if (c >= keys.length) {
161: SelectionKey[] newKeys = new SelectionKey[(keys.length + 1) << 1];
162: System.arraycopy(keys, 0, newKeys, 0, keys.length);
163: keys = newKeys;
164: }
165:
166: if (c >= keysToReadableFDs.length) {
167: int[] newKeysToReadableFDs = new int[(keysToReadableFDs.length + 1) << 1];
168: System.arraycopy(keysToReadableFDs, 0,
169: newKeysToReadableFDs, 0, keysToReadableFDs.length);
170: keysToReadableFDs = newKeysToReadableFDs;
171: }
172:
173: if (c >= keysToWritableFDs.length) {
174: int[] newKeysToWritableFDs = new int[(keysToWritableFDs.length + 1) << 1];
175: System.arraycopy(keysToWritableFDs, 0,
176: newKeysToWritableFDs, 0, keysToWritableFDs.length);
177: keysToWritableFDs = newKeysToWritableFDs;
178: }
179:
180: if (readableKeysCount >= readableFDs.length) {
181: FileDescriptor[] newReadableFDs = new FileDescriptor[(readableFDs.length + 1) << 1];
182: System.arraycopy(readableFDs, 0, newReadableFDs, 0,
183: readableFDs.length);
184: readableFDs = newReadableFDs;
185: }
186:
187: if (readableKeysCount >= readableFDsToKeys.length) {
188: int[] newReadableFDsToKeys = new int[(readableFDsToKeys.length + 1) << 1];
189: System.arraycopy(readableFDsToKeys, 0,
190: newReadableFDsToKeys, 0, readableFDsToKeys.length);
191: readableFDsToKeys = newReadableFDsToKeys;
192: }
193:
194: if (writableKeysCount >= writableFDs.length) {
195: FileDescriptor[] newWriteableFDs = new FileDescriptor[(writableFDs.length + 1) << 1];
196: System.arraycopy(writableFDs, 0, newWriteableFDs, 0,
197: writableFDs.length);
198: writableFDs = newWriteableFDs;
199: }
200:
201: if (writableKeysCount >= writableFDsToKeys.length) {
202: int[] newWritableFDsToKeys = new int[(writableFDsToKeys.length + 1) << 1];
203: System.arraycopy(writableFDsToKeys, 0,
204: newWritableFDsToKeys, 0, writableFDsToKeys.length);
205: writableFDsToKeys = newWritableFDsToKeys;
206: }
207: }
208:
209: private void limitCapacity() {
210: // TODO: implement array squeezing
211: }
212:
213: /**
214: * Adds the specified key to storage and updates the indexes accordingly
215: *
216: * @param sk
217: * key to add
218: * @return index in the storage
219: */
220: private int addKey(SelectionKey sk) {
221:
222: lastKeyIndex++;
223: int c = lastKeyIndex;
224:
225: // make sure that enough space is available
226: ensureCapacity(c);
227:
228: // add to keys storage
229: keys[c] = sk;
230:
231: // cache the fields
232: int ops = sk.interestOps();
233: FileDescriptor fd = ((FileDescriptorHandler) sk.channel())
234: .getFD();
235:
236: // presume that we have no FD associated
237: keysToReadableFDs[c] = -1;
238: keysToWritableFDs[c] = -1;
239:
240: // if readable operations requested
241: if (((SelectionKey.OP_ACCEPT | SelectionKey.OP_READ) & ops) != 0) {
242: // save as readable FD
243: readableFDs[readableKeysCount] = fd;
244:
245: // create index
246: keysToReadableFDs[c] = readableKeysCount;
247: readableFDsToKeys[readableKeysCount] = c;
248:
249: readableKeysCount++;
250: }
251:
252: // if writable operations requested
253: if (((SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE) & ops) != 0) {
254: // save as writable FD
255: writableFDs[writableKeysCount] = fd;
256:
257: // create index
258: keysToWritableFDs[c] = writableKeysCount;
259: writableFDsToKeys[writableKeysCount] = c;
260:
261: writableKeysCount++;
262: }
263:
264: return c;
265: }
266:
267: /**
268: * Deletes the key from the internal storage and updates the indexes
269: * accordingly
270: *
271: * @param sk
272: * key to delete
273: */
274: private void delKey(SelectionKey sk) {
275: int index = ((SelectionKeyImpl) sk).getIndex();
276:
277: // === deleting the key and FDs
278:
279: // key is null now
280: keys[index] = null;
281:
282: // free FDs connected with the key
283: // free indexes
284: int readableIndex = keysToReadableFDs[index];
285: if (readableIndex != -1) {
286: readableFDs[readableIndex] = null;
287: readableFDsToKeys[readableIndex] = -1;
288: keysToReadableFDs[index] = -1;
289: }
290:
291: int writableIndex = keysToWritableFDs[index];
292: if (writableIndex != -1) {
293: writableFDs[writableIndex] = null;
294: writableFDsToKeys[writableIndex] = -1;
295: keysToWritableFDs[index] = -1;
296: }
297:
298: // === compacting arrays and indexes
299:
300: // key compaction
301: if (keys[lastKeyIndex] != null) {
302: keys[index] = keys[lastKeyIndex];
303: keys[lastKeyIndex] = null;
304:
305: // update key index
306: ((SelectionKeyImpl) keys[index]).setIndex(index);
307:
308: // the key in the new position references the same FDs
309: keysToReadableFDs[index] = keysToReadableFDs[lastKeyIndex];
310: keysToWritableFDs[index] = keysToWritableFDs[lastKeyIndex];
311:
312: // associated FDs reference the same key at new index
313: if (keysToReadableFDs[index] != -1) {
314: readableFDsToKeys[keysToReadableFDs[index]] = index;
315: }
316:
317: if (keysToWritableFDs[index] != -1) {
318: writableFDsToKeys[keysToWritableFDs[index]] = index;
319: }
320:
321: }
322: lastKeyIndex--;
323:
324: // readableFDs compaction
325: if (readableIndex != -1) {
326: if (readableFDs[readableKeysCount - 1] != null) {
327: readableFDs[readableIndex] = readableFDs[readableKeysCount - 1];
328:
329: // new FD references the same key
330: readableFDsToKeys[readableIndex] = readableFDsToKeys[readableKeysCount - 1];
331:
332: // the key references the same FD at new index
333: if (readableFDsToKeys[readableIndex] != -1) {
334: keysToReadableFDs[readableFDsToKeys[readableIndex]] = readableIndex;
335: }
336: }
337: readableKeysCount--;
338: }
339:
340: // writableFDs compaction
341: if (writableIndex != -1) {
342: if (writableFDs[writableKeysCount - 1] != null) {
343: writableFDs[writableIndex] = writableFDs[writableKeysCount - 1];
344:
345: // new FD references the same key
346: writableFDsToKeys[writableIndex] = writableFDsToKeys[writableKeysCount - 1];
347:
348: // the key references the same FD at new index
349: if (writableFDsToKeys[writableIndex] != -1) {
350: keysToWritableFDs[writableFDsToKeys[writableIndex]] = writableIndex;
351: }
352: }
353: writableKeysCount--;
354: }
355: }
356:
357: /**
358: * Note that the given key has been modified
359: *
360: * @param sk
361: * the modified key.
362: */
363: void modKey(SelectionKey sk) {
364: // TODO: update indexes rather than recreate the key
365: synchronized (this ) {
366: synchronized (keysSet) {
367: synchronized (selectedKeys) {
368: delKey(sk);
369: addKey(sk);
370: }
371: }
372: }
373: }
374:
375: /**
376: * @see java.nio.channels.spi.AbstractSelector#register(java.nio.channels.spi.AbstractSelectableChannel,
377: * int, java.lang.Object)
378: */
379: @Override
380: protected SelectionKey register(AbstractSelectableChannel channel,
381: int operations, Object attachment) {
382: if (!provider().equals(channel.provider())) {
383: throw new IllegalSelectorException();
384: }
385: synchronized (this ) {
386: synchronized (keysSet) {
387:
388: // create the key
389: SelectionKey sk = new SelectionKeyImpl(channel,
390: operations, attachment, this );
391:
392: int index = addKey(sk);
393: ((SelectionKeyImpl) sk).setIndex(index);
394:
395: return sk;
396: }
397: }
398: }
399:
400: /**
401: * @see java.nio.channels.Selector#keys()
402: */
403: @Override
404: public synchronized Set<SelectionKey> keys() {
405: closeCheck();
406:
407: keysSet.clear();
408:
409: if (keys.length != lastKeyIndex + 1) {
410: SelectionKey[] chompedKeys = new SelectionKey[lastKeyIndex + 1];
411: System.arraycopy(keys, 0, chompedKeys, 0, lastKeyIndex + 1);
412: keysSet.addAll(Arrays.asList(chompedKeys));
413: } else {
414: keysSet.addAll(Arrays.asList(keys));
415: }
416:
417: keysSet.remove(source.keyFor(this ));
418: return unmodifiableKeys;
419: }
420:
421: /*
422: * Checks that the receiver is not closed. If it is throws an exception.
423: */
424: private void closeCheck() {
425: if (!isOpen()) {
426: throw new ClosedSelectorException();
427: }
428: }
429:
430: /**
431: * @see java.nio.channels.Selector#select()
432: */
433: @Override
434: public int select() throws IOException {
435: return selectInternal(SELECT_BLOCK);
436: }
437:
438: /**
439: * @see java.nio.channels.Selector#select(long)
440: */
441: @Override
442: public int select(long timeout) throws IOException {
443: if (timeout < 0) {
444: throw new IllegalArgumentException();
445: }
446: return selectInternal((0 == timeout) ? SELECT_BLOCK : timeout);
447: }
448:
449: /**
450: * @see java.nio.channels.Selector#selectNow()
451: */
452: @Override
453: public int selectNow() throws IOException {
454: return selectInternal(SELECT_NOW);
455: }
456:
457: private int selectInternal(long timeout) throws IOException {
458: closeCheck();
459: synchronized (this ) {
460: synchronized (keysSet) {
461: synchronized (selectedKeys) {
462: doCancel();
463: int[] readyChannels = null;
464: boolean isBlock = (SELECT_NOW != timeout);
465: prepareChannels();
466: try {
467: if (isBlock) {
468: begin();
469: }
470: readyChannels = Platform.getNetworkSystem()
471: .select(readableFDs, writableFDs,
472: timeout);
473: } finally {
474: if (isBlock) {
475: end();
476: }
477: }
478: return processSelectResult(readyChannels);
479: }
480: }
481: }
482: }
483:
484: private boolean isConnected(SelectionKeyImpl key) {
485: SelectableChannel channel = key.channel();
486: if (channel instanceof SocketChannel) {
487: return ((SocketChannel) channel).isConnected();
488: }
489: return true;
490: }
491:
492: /*
493: * Prepares and adds channels to list for selection
494: */
495: private void prepareChannels() {
496:
497: // chomp the arrays if needed
498:
499: if (readableFDs.length != readableKeysCount) {
500: FileDescriptor[] chompedReadableFDs = new FileDescriptor[readableKeysCount];
501: System.arraycopy(readableFDs, 0, chompedReadableFDs, 0,
502: readableKeysCount);
503: readableFDs = chompedReadableFDs;
504: }
505:
506: if (writableFDs.length != writableKeysCount) {
507: FileDescriptor[] chompedWriteableFDs = new FileDescriptor[writableKeysCount];
508: System.arraycopy(writableFDs, 0, chompedWriteableFDs, 0,
509: writableKeysCount);
510: writableFDs = chompedWriteableFDs;
511: }
512:
513: }
514:
515: /*
516: * Analyses selected channels and adds keys of ready channels to
517: * selectedKeys list.
518: *
519: * readyChannels are encoded as concatenated array of flags for readable
520: * channels followed by writable channels.
521: */
522: private int processSelectResult(int[] readyChannels)
523: throws IOException {
524: if (0 == readyChannels.length) {
525: return 0;
526: }
527: // if the mock channel is selected, read the content.
528: if (READABLE == readyChannels[0]) {
529: ByteBuffer readbuf = ByteBuffer.allocate(MOCK_READBUF_SIZE);
530: while (source.read(readbuf) > 0) {
531: readbuf.flip();
532: }
533: }
534: int selected = 0;
535:
536: for (int i = 1; i < readyChannels.length; i++) {
537:
538: if (readyChannels[i] != NA) {
539: SelectionKeyImpl key = (SelectionKeyImpl) (i >= readableKeysCount ? keys[writableFDsToKeys[i
540: - readableKeysCount]]
541: : keys[readableFDsToKeys[i]]);
542:
543: if (null == key) {
544: continue;
545: }
546:
547: int ops = key.interestOps();
548: int selectedOp = 0;
549:
550: switch (readyChannels[i]) {
551:
552: case READABLE:
553: selectedOp = (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)
554: & ops;
555: break;
556: case WRITEABLE:
557: if (isConnected(key)) {
558: selectedOp = SelectionKey.OP_WRITE & ops;
559: } else {
560: selectedOp = SelectionKey.OP_CONNECT & ops;
561: }
562: break;
563: }
564:
565: if (0 != selectedOp) {
566: boolean isOldSelectedKey = selectedKeys
567: .contains(key);
568: if (isOldSelectedKey
569: && key.readyOps() != selectedOp) {
570: key.setReadyOps(key.readyOps() | selectedOp);
571: selected++;
572: } else if (!isOldSelectedKey) {
573: key.setReadyOps(selectedOp);
574: selectedKeys.add(key);
575: selected++;
576: }
577: }
578:
579: }
580: }
581:
582: return selected;
583: }
584:
585: /**
586: * @see java.nio.channels.Selector#selectedKeys()
587: */
588: @Override
589: public synchronized Set<SelectionKey> selectedKeys() {
590: closeCheck();
591: return unaddableSelectedKeys;
592: }
593:
594: /*
595: * Assumes calling thread holds locks on 'this', 'keysSet', and 'selectedKeys'.
596: */
597: private void doCancel() {
598: Set<SelectionKey> cancelledKeys = cancelledKeys();
599: synchronized (cancelledKeys) {
600: if (cancelledKeys.size() > 0) {
601: for (SelectionKey currentkey : cancelledKeys) {
602: delKey(currentkey);
603: deregister((AbstractSelectionKey) currentkey);
604: selectedKeys.remove(currentkey);
605: }
606: }
607: cancelledKeys.clear();
608: limitCapacity();
609: }
610: }
611:
612: /**
613: * @see java.nio.channels.Selector#wakeup()
614: */
615: @Override
616: public Selector wakeup() {
617: try {
618: sink.write(ByteBuffer.allocate(MOCK_WRITEBUF_SIZE));
619: } catch (IOException e) {
620: // do nothing
621: }
622: return this ;
623: }
624:
625: private static class UnaddableSet<E> implements Set<E> {
626:
627: private Set<E> set;
628:
629: UnaddableSet(Set<E> set) {
630: this .set = set;
631: }
632:
633: @Override
634: public boolean equals(Object object) {
635: return set.equals(object);
636: }
637:
638: @Override
639: public int hashCode() {
640: return set.hashCode();
641: }
642:
643: public boolean add(E object) {
644: throw new UnsupportedOperationException();
645: }
646:
647: public boolean addAll(Collection<? extends E> c) {
648: throw new UnsupportedOperationException();
649: }
650:
651: public void clear() {
652: set.clear();
653: }
654:
655: public boolean contains(Object object) {
656: return set.contains(object);
657: }
658:
659: public boolean containsAll(Collection<?> c) {
660: return set.containsAll(c);
661: }
662:
663: public boolean isEmpty() {
664: return set.isEmpty();
665: }
666:
667: public Iterator<E> iterator() {
668: return set.iterator();
669: }
670:
671: public boolean remove(Object object) {
672: return set.remove(object);
673: }
674:
675: public boolean removeAll(Collection<?> c) {
676: return set.removeAll(c);
677: }
678:
679: public boolean retainAll(Collection<?> c) {
680: return set.retainAll(c);
681: }
682:
683: public int size() {
684: return set.size();
685: }
686:
687: public Object[] toArray() {
688: return set.toArray();
689: }
690:
691: public <T> T[] toArray(T[] a) {
692: return set.toArray(a);
693: }
694: }
695: }
|