001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.tomcat.util.net;
018:
019: import java.util.concurrent.atomic.AtomicInteger;
020: import java.nio.channels.Selector;
021: import java.io.IOException;
022: import java.util.NoSuchElementException;
023: import java.nio.ByteBuffer;
024: import java.nio.channels.SelectionKey;
025: import java.io.EOFException;
026: import java.net.SocketTimeoutException;
027: import java.util.concurrent.ConcurrentLinkedQueue;
028: import org.apache.juli.logging.Log;
029: import org.apache.juli.logging.LogFactory;
030:
031: /**
032: *
033: * Thread safe non blocking selector pool
034: * @author Filip Hanik
035: * @version 1.0
036: * @since 6.0
037: */
038:
039: public class NioSelectorPool {
040: protected static Log log = LogFactory.getLog(NioSelectorPool.class);
041:
042: protected final static boolean SHARED = Boolean.valueOf(
043: System.getProperty(
044: "org.apache.tomcat.util.net.NioSelectorShared",
045: "true")).booleanValue();
046: protected static Selector SHARED_SELECTOR;
047:
048: protected int maxSelectors = 200;
049: protected int maxSpareSelectors = -1;
050: protected boolean enabled = true;
051: protected AtomicInteger active = new AtomicInteger(0);
052: protected AtomicInteger spare = new AtomicInteger(0);
053: protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
054:
055: protected static Selector getSharedSelector() throws IOException {
056: if (SHARED && SHARED_SELECTOR == null) {
057: synchronized (NioSelectorPool.class) {
058: if (SHARED_SELECTOR == null) {
059: SHARED_SELECTOR = Selector.open();
060: log
061: .info("Using a shared selector for servlet write/read");
062: }
063: }
064: }
065: return SHARED_SELECTOR;
066: }
067:
068: public Selector get() throws IOException {
069: if (SHARED) {
070: return getSharedSelector();
071: }
072: if ((!enabled) || active.incrementAndGet() >= maxSelectors) {
073: if (enabled)
074: active.decrementAndGet();
075: return null;
076: }
077: Selector s = null;
078: try {
079: s = selectors.size() > 0 ? selectors.poll() : null;
080: if (s == null)
081: s = Selector.open();
082: else
083: spare.decrementAndGet();
084:
085: } catch (NoSuchElementException x) {
086: try {
087: s = Selector.open();
088: } catch (IOException iox) {
089: }
090: } finally {
091: if (s == null)
092: active.decrementAndGet();//we were unable to find a selector
093: }
094: return s;
095: }
096:
097: public void put(Selector s) throws IOException {
098: if (SHARED)
099: return;
100: if (enabled)
101: active.decrementAndGet();
102: if (enabled
103: && (maxSpareSelectors == -1 || spare.get() < Math.min(
104: maxSpareSelectors, maxSelectors))) {
105: spare.incrementAndGet();
106: selectors.offer(s);
107: } else
108: s.close();
109: }
110:
111: public void close() throws IOException {
112: enabled = false;
113: Selector s;
114: while ((s = selectors.poll()) != null)
115: s.close();
116: spare.set(0);
117: active.set(0);
118: if (SHARED && getSharedSelector() != null) {
119: getSharedSelector().close();
120: SHARED_SELECTOR = null;
121: }
122: }
123:
124: public void open() throws IOException {
125: enabled = true;
126: getSharedSelector();
127: }
128:
129: /**
130: * Performs a blocking write using the bytebuffer for data to be written and a selector to block.
131: * If the <code>selector</code> parameter is null, then it will perform a busy write that could
132: * take up a lot of CPU cycles.
133: * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
134: * @param socket SocketChannel - the socket to write data to
135: * @param selector Selector - the selector to use for blocking, if null then a busy write will be initiated
136: * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
137: * @return int - returns the number of bytes written
138: * @throws EOFException if write returns -1
139: * @throws SocketTimeoutException if the write times out
140: * @throws IOException if an IO Exception occurs in the underlying socket logic
141: */
142: public int write(ByteBuffer buf, NioChannel socket,
143: Selector selector, long writeTimeout) throws IOException {
144: return write(buf, socket, selector, writeTimeout, true);
145: }
146:
147: public int write(ByteBuffer buf, NioChannel socket,
148: Selector selector, long writeTimeout, boolean block)
149: throws IOException {
150: if (SHARED && block) {
151: return NioBlockingSelector.write(buf, socket, writeTimeout);
152: }
153: SelectionKey key = null;
154: int written = 0;
155: boolean timedout = false;
156: int keycount = 1; //assume we can write
157: long time = System.currentTimeMillis(); //start the timeout timer
158: if (socket.getBufHandler().getWriteBuffer() != buf) {
159: socket.getBufHandler().getWriteBuffer().put(buf);
160: buf = socket.getBufHandler().getWriteBuffer();
161: }
162: try {
163: while ((!timedout) && buf.hasRemaining()) {
164: int cnt = 0;
165: if (keycount > 0) { //only write if we were registered for a write
166: cnt = socket.write(buf); //write the data
167: if (cnt == -1)
168: throw new EOFException();
169: written += cnt;
170: if (cnt > 0) {
171: time = System.currentTimeMillis(); //reset our timeout timer
172: continue; //we successfully wrote, try again without a selector
173: }
174: if (cnt == 0 && (!block))
175: break; //don't block
176: }
177: if (selector != null) {
178: //register OP_WRITE to the selector
179: if (key == null)
180: key = socket.getIOChannel().register(selector,
181: SelectionKey.OP_WRITE);
182: else
183: key.interestOps(SelectionKey.OP_WRITE);
184: keycount = selector.select(writeTimeout);
185: }
186: if (writeTimeout > 0
187: && (selector == null || keycount == 0))
188: timedout = (System.currentTimeMillis() - time) >= writeTimeout;
189: }//while
190: if (timedout)
191: throw new SocketTimeoutException();
192: } finally {
193: if (key != null) {
194: key.cancel();
195: if (selector != null)
196: selector.selectNow();//removes the key from this selector
197: }
198: }
199: return written;
200: }
201:
202: /**
203: * Performs a blocking read using the bytebuffer for data to be read and a selector to block.
204: * If the <code>selector</code> parameter is null, then it will perform a busy read that could
205: * take up a lot of CPU cycles.
206: * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
207: * @param socket SocketChannel - the socket to write data to
208: * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
209: * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
210: * @return int - returns the number of bytes read
211: * @throws EOFException if read returns -1
212: * @throws SocketTimeoutException if the read times out
213: * @throws IOException if an IO Exception occurs in the underlying socket logic
214: */
215: public int read(ByteBuffer buf, NioChannel socket,
216: Selector selector, long readTimeout) throws IOException {
217: return read(buf, socket, selector, readTimeout, true);
218: }
219:
220: /**
221: * Performs a read using the bytebuffer for data to be read and a selector to register for events should
222: * you have the block=true.
223: * If the <code>selector</code> parameter is null, then it will perform a busy read that could
224: * take up a lot of CPU cycles.
225: * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
226: * @param socket SocketChannel - the socket to write data to
227: * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
228: * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
229: * @param block - true if you want to block until data becomes available or timeout time has been reached
230: * @return int - returns the number of bytes read
231: * @throws EOFException if read returns -1
232: * @throws SocketTimeoutException if the read times out
233: * @throws IOException if an IO Exception occurs in the underlying socket logic
234: */
235: public int read(ByteBuffer buf, NioChannel socket,
236: Selector selector, long readTimeout, boolean block)
237: throws IOException {
238: if (SHARED && block) {
239: return NioBlockingSelector.read(buf, socket, readTimeout);
240: }
241: SelectionKey key = null;
242: int read = 0;
243: boolean timedout = false;
244: int keycount = 1; //assume we can write
245: long time = System.currentTimeMillis(); //start the timeout timer
246: try {
247: while ((!timedout)) {
248: int cnt = 0;
249: if (keycount > 0) { //only read if we were registered for a read
250: cnt = socket.read(buf);
251: if (cnt == -1)
252: throw new EOFException();
253: read += cnt;
254: if (cnt > 0)
255: continue; //read some more
256: if (cnt == 0 && (read > 0 || (!block)))
257: break; //we are done reading
258: }
259: if (selector != null) {//perform a blocking read
260: //register OP_WRITE to the selector
261: if (key == null)
262: key = socket.getIOChannel().register(selector,
263: SelectionKey.OP_READ);
264: else
265: key.interestOps(SelectionKey.OP_READ);
266: keycount = selector.select(readTimeout);
267: }
268: if (readTimeout > 0
269: && (selector == null || keycount == 0))
270: timedout = (System.currentTimeMillis() - time) >= readTimeout;
271: }//while
272: if (timedout)
273: throw new SocketTimeoutException();
274: } finally {
275: if (key != null) {
276: key.cancel();
277: if (selector != null)
278: selector.selectNow();//removes the key from this selector
279: }
280: }
281: return read;
282: }
283:
284: public void setMaxSelectors(int maxSelectors) {
285: this .maxSelectors = maxSelectors;
286: }
287:
288: public void setMaxSpareSelectors(int maxSpareSelectors) {
289: this .maxSpareSelectors = maxSpareSelectors;
290: }
291:
292: public void setEnabled(boolean enabled) {
293: this .enabled = enabled;
294: }
295:
296: public int getMaxSelectors() {
297: return maxSelectors;
298: }
299:
300: public int getMaxSpareSelectors() {
301: return maxSpareSelectors;
302: }
303:
304: public boolean isEnabled() {
305: return enabled;
306: }
307: }
|