001: /* Licensed to the Apache Software Foundation (ASF) under one or more
002: * contributor license agreements. See the NOTICE file distributed with
003: * this work for additional information regarding copyright ownership.
004: * The ASF licenses this file to You under the Apache License, Version 2.0
005: * (the "License"); you may not use this file except in compliance with
006: * the License. You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.harmony.nio.internal;
017:
018: import java.io.FileDescriptor;
019: import java.io.IOException;
020: import java.nio.ByteBuffer;
021: import java.nio.channels.ClosedSelectorException;
022: import java.nio.channels.IllegalSelectorException;
023: import java.nio.channels.Pipe;
024: import java.nio.channels.SelectableChannel;
025: import java.nio.channels.SelectionKey;
026: import java.nio.channels.Selector;
027: import java.nio.channels.SocketChannel;
028: import java.nio.channels.spi.AbstractSelectableChannel;
029: import java.nio.channels.spi.AbstractSelectionKey;
030: import java.nio.channels.spi.AbstractSelector;
031: import java.nio.channels.spi.SelectorProvider;
032: import java.util.Arrays;
033: import java.util.Collection;
034: import java.util.Collections;
035: import java.util.HashSet;
036: import java.util.Iterator;
037: import java.util.Set;
038: import java.lang.Class;
039:
040: import org.apache.harmony.luni.platform.FileDescriptorHandler;
041: import org.apache.harmony.luni.platform.Platform;
042:
043: /*
044: * Default implementation of java.nio.channels.Selector
045: *
046: */
047: final class EpollSelectorImpl extends AbstractSelector {
048:
049: private static final int MOCK_WRITEBUF_SIZE = 1;
050:
051: private static final int MOCK_READBUF_SIZE = 8;
052:
053: private static final int NA = 0;
054:
055: private static final int READABLE = 1;
056:
057: private static final int WRITABLE = 2;
058:
059: private static final int SELECT_BLOCK = -1;
060:
061: private static final int SELECT_NOW = 0;
062:
063: // keysLock is used to brief synchronization when get selectionKeys snapshot
064: // before selection
065: final Object keysLock = new Object();
066:
067: boolean keySetChanged = true;
068:
069: private SelectionKey[] keys = new SelectionKey[1];
070:
071: private final Set<SelectionKey> keysSet = new HashSet<SelectionKey>();
072:
073: private Set<SelectionKey> unmodifiableKeys = Collections
074: .unmodifiableSet(keysSet);
075:
076: private final Set<SelectionKey> selectedKeys = new HashSet<SelectionKey>();
077:
078: private Set<SelectionKey> unaddableSelectedKeys = new UnaddableSet<SelectionKey>(
079: selectedKeys);
080:
081: // sink and source are used by wakeup()
082: private Pipe.SinkChannel sink;
083:
084: private Pipe.SourceChannel source;
085:
086: private FileDescriptor sourcefd;
087:
088: private int[] keyFDs;
089:
090: private int[] readyFDs;
091:
092: private int[] readyOps;
093:
094: private int keysCount = 0;
095:
096: private long epollFD;
097:
098: private int countReady;
099:
100: public Class fileDescriptorClass;
101:
102: static native int resolveFD(Class cfd, FileDescriptor ofd);
103:
104: static native long prepare();
105:
106: static native long addFileDescriptor(long epollFD, int mode, int fd);
107:
108: static native long delFileDescriptor(long epollFD, long fd);
109:
110: static native int epoll(long epollFD, int count, int[] FDs,
111: int[] ops, long timeout);
112:
113: private InternalKeyMap<EpollSelectionKeyImpl> quickMap = new InternalKeyMap<EpollSelectionKeyImpl>();
114:
115: public EpollSelectorImpl(SelectorProvider selectorProvider) {
116: super (selectorProvider);
117: try {
118: Pipe mockSelector = selectorProvider.openPipe();
119: sink = mockSelector.sink();
120: source = mockSelector.source();
121: sourcefd = ((FileDescriptorHandler) source).getFD();
122: source.configureBlocking(false);
123:
124: fileDescriptorClass = sourcefd.getClass();
125:
126: keyFDs = new int[1];
127: readyFDs = new int[1];
128: readyOps = new int[1];
129:
130: // register sink channel
131: keyFDs[0] = resolveFD(fileDescriptorClass, sourcefd);
132: keys[0] = source.keyFor(this );
133: epollFD = prepare();
134:
135: keysCount = 1;
136:
137: quickMap.put(keyFDs[0], (EpollSelectionKeyImpl) keys[0]);
138: addFileDescriptor(epollFD, 1, keyFDs[0]);
139:
140: } catch (IOException e) {
141: // do nothing
142: }
143: }
144:
145: /*
146: * @see java.nio.channels.spi.AbstractSelector#implCloseSelector()
147: */
148: protected void implCloseSelector() throws IOException {
149: synchronized (this ) {
150: synchronized (keysSet) {
151: synchronized (selectedKeys) {
152: doCancel();
153: for (int c = 0; c < keysCount; c++) {
154: if (keys[c] != null) {
155: deregister((AbstractSelectionKey) keys[c]);
156: }
157: }
158: wakeup();
159: }
160: }
161: }
162: }
163:
164: private void ensureCapacity(int c) {
165: // TODO: rewrite array handling as some internal class
166: if (c >= keys.length) {
167: SelectionKey[] t = new SelectionKey[(keys.length + 1) << 1];
168: System.arraycopy(keys, 0, t, 0, keys.length);
169: keys = t;
170: }
171:
172: if (c >= readyFDs.length) {
173: int[] t = new int[(readyFDs.length + 1) << 1];
174: System.arraycopy(readyFDs, 0, t, 0, readyFDs.length);
175: readyFDs = t;
176: }
177:
178: if (c >= keyFDs.length) {
179: int[] t = new int[(keyFDs.length + 1) << 1];
180: System.arraycopy(keyFDs, 0, t, 0, keyFDs.length);
181: keyFDs = t;
182: }
183:
184: if (c >= readyOps.length) {
185: int[] t = new int[(readyOps.length + 1) << 1];
186: System.arraycopy(readyOps, 0, t, 0, readyOps.length);
187: readyOps = t;
188: }
189: }
190:
191: private void limitCapacity() {
192: // TODO: implement array squeezing
193: }
194:
195: /**
196: * Adds the specified key to storage and updates the indexes accordingly
197: *
198: * @param sk
199: * key to add
200: * @return index in the storage
201: */
202: private int addKey(SelectionKey sk) {
203:
204: // make sure that enough space is available
205: ensureCapacity(keysCount);
206:
207: // get channel params
208: int ops = sk.interestOps();
209: int fd = resolveFD(fileDescriptorClass,
210: ((FileDescriptorHandler) sk.channel()).getFD());
211:
212: int eops = 0;
213: if (((SelectionKey.OP_READ | SelectionKey.OP_ACCEPT) & ops) != 0) {
214: eops = eops + READABLE;
215: }
216: ;
217: if (((SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT) & ops) != 0) {
218: eops = eops + WRITABLE;
219: }
220: ;
221:
222: keys[keysCount] = sk;
223: keyFDs[keysCount] = fd;
224:
225: quickMap.put(fd, (EpollSelectionKeyImpl) sk);
226: addFileDescriptor(epollFD, eops, fd);
227:
228: return keysCount++;
229: }
230:
231: /**
232: * Deletes the key from the internal storage and updates the indexes
233: * accordingly
234: *
235: * @param sk
236: * key to delete
237: */
238: private void delKey(SelectionKey sk) {
239:
240: // get the key index in the internal storage
241: int index = ((EpollSelectionKeyImpl) sk).getIndex();
242:
243: // deregister FD in native
244: delFileDescriptor(epollFD, keyFDs[index]);
245:
246: if (quickMap.remove(keyFDs[index]) == null) {
247: throw new RuntimeException();
248: }
249: // key is null now
250: keys[index] = null;
251:
252: // key compaction to ensure lack of holes
253: // we can simply exchange latest and current keys
254: if (keys[keysCount - 1] != null) {
255: keys[index] = keys[keysCount - 1];
256: keys[keysCount - 1] = null;
257:
258: keyFDs[index] = keyFDs[keysCount - 1];
259: keyFDs[keysCount - 1] = -1;
260:
261: // update key index
262: ((EpollSelectionKeyImpl) keys[index]).setIndex(index);
263: }
264: keysCount--;
265: }
266:
267: /**
268: *
269: * @param sk
270: */
271: void modKey(SelectionKey sk) {
272: // TODO: update indexes rather than recreate the key
273: synchronized (keysSet) {
274: delKey(sk);
275: addKey(sk);
276: }
277: }
278:
279: /*
280: * @see java.nio.channels.spi.AbstractSelector#register(java.nio.channels.spi.AbstractSelectableChannel,
281: * int, java.lang.Object)
282: */
283: protected SelectionKey register(AbstractSelectableChannel channel,
284: int operations, Object attachment) {
285: if (!provider().equals(channel.provider())) {
286: throw new IllegalSelectorException();
287: }
288: synchronized (this ) {
289: synchronized (keysSet) {
290:
291: // System.out.println("Registering channel");
292: // create the key
293: SelectionKey sk = new EpollSelectionKeyImpl(channel,
294: operations, attachment, this );
295:
296: int index = addKey(sk);
297: ((EpollSelectionKeyImpl) sk).setIndex(index);
298:
299: // System.out.println(" channel registered with index = " +
300: // index);
301: return sk;
302: }
303: }
304: }
305:
306: /*
307: * @see java.nio.channels.Selector#keys()
308: */
309: public synchronized Set<SelectionKey> keys() {
310: closeCheck();
311:
312: keysSet.clear();
313:
314: if (keys.length != keysCount) {
315: SelectionKey[] chompedKeys = new SelectionKey[keysCount];
316: System.arraycopy(keys, 0, chompedKeys, 0, keysCount);
317: keysSet.addAll(Arrays.asList(chompedKeys));
318: } else {
319: keysSet.addAll(Arrays.asList(keys));
320: }
321:
322: keysSet.remove(source.keyFor(this ));
323: return unmodifiableKeys;
324: }
325:
326: private void closeCheck() {
327: if (!isOpen()) {
328: throw new ClosedSelectorException();
329: }
330: }
331:
332: /*
333: * @see java.nio.channels.Selector#select()
334: */
335: public int select() throws IOException {
336: return selectInternal(SELECT_BLOCK);
337: }
338:
339: /*
340: * @see java.nio.channels.Selector#select(long)
341: */
342: public int select(long timeout) throws IOException {
343: if (timeout < 0) {
344: throw new IllegalArgumentException();
345: }
346: return selectInternal((0 == timeout) ? SELECT_BLOCK : timeout);
347: }
348:
349: /*
350: * @see java.nio.channels.Selector#selectNow()
351: */
352: public int selectNow() throws IOException {
353: return selectInternal(SELECT_NOW);
354: }
355:
356: private int selectInternal(long timeout) throws IOException {
357: closeCheck();
358: synchronized (this ) {
359: synchronized (keysSet) {
360: synchronized (selectedKeys) {
361: doCancel();
362: boolean isBlock = (SELECT_NOW != timeout);
363: try {
364: if (isBlock) {
365: begin();
366: }
367: // System.out.println("calling native epoll(): keysCount
368: // = " + keysCount + ", readyFDs.length = " +
369: // readyFDs.length + ", readyOps.length = " +
370: // readyOps.length);
371: countReady = epoll(epollFD, keysCount,
372: readyFDs, readyOps, timeout);
373: // System.out.println(" returns " + countReady);
374: } finally {
375: if (isBlock) {
376: end();
377: }
378: }
379: return processSelectResult();
380: }
381: }
382: }
383: }
384:
385: private boolean isConnected(EpollSelectionKeyImpl key) {
386: SelectableChannel channel = key.channel();
387: if (channel instanceof SocketChannel) {
388: return ((SocketChannel) channel).isConnected();
389: }
390: return true;
391: }
392:
393: /*
394: * Analyses selected channels and adds keys of ready channels to
395: * selectedKeys list.
396: *
397: * readyChannels are encoded as concatenated array of flags for readable
398: * channels followed by writable channels.
399: */
400: private int processSelectResult() throws IOException {
401: if (0 == countReady) {
402: return 0;
403: }
404: if (-1 == countReady) {
405: return 0;
406: }
407: // if the mock channel is selected, read the content.
408: if (READABLE == readyOps[0]) {
409: ByteBuffer readbuf = ByteBuffer.allocate(MOCK_READBUF_SIZE);
410: while (source.read(readbuf) > 0) {
411: readbuf.flip();
412: }
413: }
414: int selected = 0;
415:
416: EpollSelectionKeyImpl key = null;
417: for (int i = 0; i < countReady; i++) {
418:
419: // System.out.println("processSelectResults(): mapping readyFDs[" +
420: // i + "]");
421: // Lookup the key, map the index in readyFDs to real key
422: key = (EpollSelectionKeyImpl) quickMap.get(readyFDs[i]);
423:
424: if (null == key) {
425: continue;
426: }
427: // System.out.println(" ready key = " + key.getIndex());
428:
429: int ops = key.interestOps();
430: int selectedOp = 0;
431:
432: if ((readyOps[i] & READABLE) != 0) {
433: selectedOp = (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)
434: & ops;
435: }
436:
437: if ((readyOps[i] & WRITABLE) != 0) {
438: if (isConnected(key)) {
439: selectedOp = selectedOp
440: | (SelectionKey.OP_WRITE & ops);
441: } else {
442: selectedOp = selectedOp
443: | (SelectionKey.OP_CONNECT & ops);
444: }
445: }
446:
447: if (0 != selectedOp) {
448: if (selectedKeys.contains(key)) {
449: if (key.readyOps() != selectedOp) {
450: key.setReadyOps(key.readyOps() | selectedOp);
451: selected++;
452: }
453: } else {
454: key.setReadyOps(selectedOp);
455: selectedKeys.add(key);
456: selected++;
457: }
458: }
459:
460: }
461:
462: // System.out.println("processSelectResult(): selected = " + selected);
463: return selected;
464: }
465:
466: /*
467: * @see java.nio.channels.Selector#selectedKeys()
468: */
469: public synchronized Set<SelectionKey> selectedKeys() {
470: closeCheck();
471: return unaddableSelectedKeys;
472: }
473:
474: private void doCancel() {
475: Set<SelectionKey> cancelledKeys = cancelledKeys();
476: synchronized (cancelledKeys) {
477: if (cancelledKeys.size() > 0) {
478: for (SelectionKey currentkey : cancelledKeys) {
479: delKey(currentkey);
480: deregister((AbstractSelectionKey) currentkey);
481: selectedKeys.remove(currentkey);
482: }
483: }
484: cancelledKeys.clear();
485: limitCapacity();
486: }
487: }
488:
489: /*
490: * @see java.nio.channels.Selector#wakeup()
491: */
492: public Selector wakeup() {
493: try {
494: sink.write(ByteBuffer.allocate(MOCK_WRITEBUF_SIZE));
495: } catch (IOException e) {
496: // do nothing
497: }
498: return this ;
499: }
500:
501: private static class InternalKeyMap<E> {
502:
503: Entry<E>[] storage;
504:
505: int size;
506:
507: Entry deleted = new Entry(-1, null);
508:
509: final int threshRatio = 4;
510:
511: public InternalKeyMap() {
512: this (1);
513: }
514:
515: public InternalKeyMap(int initialSize) {
516: storage = new Entry[initialSize];
517: size = 0;
518: }
519:
520: private E putEntryNoCheck(Entry<E>[] storage, int key,
521: Entry<E> entry) {
522:
523: for (int tryCount = 0; tryCount < storage.length; tryCount++) {
524:
525: int hash = hash(key, tryCount);
526: int index = hash % storage.length;
527:
528: // System.out.println("put: hash: " + hash + ", index: " + index
529: // + ", key: " + key + ", size: " + size + ", tryCount: " +
530: // tryCount + ", storage.length=" + storage.length);
531:
532: if (storage[index] == null) {
533: if (entry != deleted) {
534: storage[index] = entry;
535: }
536: return null;
537: } else if (storage[index].key == key
538: || (storage[index] == deleted && entry != deleted)) {
539: E t = storage[index].value;
540: storage[index] = entry;
541: return t;
542: }
543: }
544:
545: throw new ArrayIndexOutOfBoundsException();
546: }
547:
548: private E putEntry(int key, Entry<E> entry) {
549: if (size >= storage.length
550: || (storage.length / (storage.length - size)) >= threshRatio) {
551: rehash();
552: }
553:
554: E result = putEntryNoCheck(storage, key, entry);
555: if (result == null) {
556: size++;
557: }
558:
559: return result;
560: }
561:
562: public void put(int key, E value) {
563: Entry<E> t = new Entry<E>(key, value);
564: putEntry(key, t);
565: }
566:
567: public E remove(int key) {
568: E result = putEntryNoCheck(storage, key, deleted);
569:
570: if (result != null) {
571: size--;
572: }
573:
574: return result;
575: }
576:
577: public E get(int key) {
578: if (storage == null) {
579: // System.out.println(" FAIL, storage=null");
580: return null;
581: }
582:
583: for (int tryCount = 0; tryCount < storage.length; tryCount++) {
584: int hash = hash(key, tryCount);
585: int index = hash % storage.length;
586:
587: // System.out.println("get: hash: " + hash + ", index: " + index
588: // + ", key: " + key + ", size: " + size + ", tryCount: " +
589: // tryCount + ", storage.length=" + storage.length);
590:
591: if (storage[index] == null) {
592: // System.out.println("Lookup FAIL, reached end");
593: return null;
594: }
595:
596: if (storage[index].key == key) {
597: // System.out.println("Lookup OK!");
598: return storage[index].value;
599: }
600:
601: }
602: // System.out.println(" FAIL, tryCount > storage.length");
603: return null;
604: }
605:
606: private void rehash() {
607: Entry<E>[] newStorage = new Entry[storage.length << 1];
608: int newSize = 0;
609: for (int c = 0; c < storage.length; c++) {
610: if (storage[c] == null)
611: continue;
612: if (storage[c] == deleted)
613: continue;
614: putEntryNoCheck(newStorage, storage[c].key, storage[c]);
615: newSize++;
616: }
617: storage = newStorage;
618: size = newSize;
619: }
620:
621: private int hash(int key, int tryCount) {
622: int t1 = key * 31 + 1;
623: int t2 = 2 * key + 1;
624: return (t1 + (t2 * tryCount)) & 0x7FFFFFFF;
625: }
626:
627: private static class Entry<E> {
628:
629: final int key;
630:
631: final E value;
632:
633: public Entry(int iKey, E iValue) {
634: key = iKey;
635: value = iValue;
636: }
637:
638: }
639:
640: }
641:
642: private static class UnaddableSet<E> implements Set<E> {
643:
644: private Set<E> set;
645:
646: UnaddableSet(Set<E> set) {
647: this .set = set;
648: }
649:
650: public boolean equals(Object object) {
651: return set.equals(object);
652: }
653:
654: public int hashCode() {
655: return set.hashCode();
656: }
657:
658: public boolean add(E object) {
659: throw new UnsupportedOperationException();
660: }
661:
662: public boolean addAll(Collection<? extends E> c) {
663: throw new UnsupportedOperationException();
664: }
665:
666: public void clear() {
667: set.clear();
668: }
669:
670: public boolean contains(Object object) {
671: return set.contains(object);
672: }
673:
674: public boolean containsAll(Collection<?> c) {
675: return set.containsAll(c);
676: }
677:
678: public boolean isEmpty() {
679: return set.isEmpty();
680: }
681:
682: public Iterator<E> iterator() {
683: return set.iterator();
684: }
685:
686: public boolean remove(Object object) {
687: return set.remove(object);
688: }
689:
690: public boolean removeAll(Collection<?> c) {
691: return set.removeAll(c);
692: }
693:
694: public boolean retainAll(Collection<?> c) {
695: return set.retainAll(c);
696: }
697:
698: public int size() {
699: return set.size();
700: }
701:
702: public Object[] toArray() {
703: return set.toArray();
704: }
705:
706: public <T> T[] toArray(T[] a) {
707: return set.toArray(a);
708: }
709: }
710:
711: }
|