001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.tomcat.util.net;
018:
019: import java.io.EOFException;
020: import java.io.IOException;
021: import java.net.SocketTimeoutException;
022: import java.nio.ByteBuffer;
023: import java.nio.channels.SelectionKey;
024: import java.util.concurrent.TimeUnit;
025:
026: import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
027:
028: public class NioBlockingSelector {
029: public NioBlockingSelector() {
030: }
031:
032: /**
033: * Performs a blocking write using the bytebuffer for data to be written
034: * If the <code>selector</code> parameter is null, then it will perform a busy write that could
035: * take up a lot of CPU cycles.
036: * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
037: * @param socket SocketChannel - the socket to write data to
038: * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
039: * @return int - returns the number of bytes written
040: * @throws EOFException if write returns -1
041: * @throws SocketTimeoutException if the write times out
042: * @throws IOException if an IO Exception occurs in the underlying socket logic
043: */
044: public static int write(ByteBuffer buf, NioChannel socket,
045: long writeTimeout) throws IOException {
046: SelectionKey key = socket.getIOChannel().keyFor(
047: socket.getPoller().getSelector());
048: int written = 0;
049: boolean timedout = false;
050: int keycount = 1; //assume we can write
051: long time = System.currentTimeMillis(); //start the timeout timer
052: if (socket.getBufHandler().getWriteBuffer() != buf) {
053: socket.getBufHandler().getWriteBuffer().put(buf);
054: buf = socket.getBufHandler().getWriteBuffer();
055: }
056: try {
057: while ((!timedout) && buf.hasRemaining()) {
058: if (keycount > 0) { //only write if we were registered for a write
059: int cnt = socket.write(buf); //write the data
060: if (cnt == -1)
061: throw new EOFException();
062: written += cnt;
063: if (cnt > 0) {
064: time = System.currentTimeMillis(); //reset our timeout timer
065: continue; //we successfully wrote, try again without a selector
066: }
067: }
068: if (key == null)
069: throw new IOException("Key no longer registered");
070: KeyAttachment att = (KeyAttachment) key.attachment();
071: try {
072: if (att.getWriteLatch() == null
073: || att.getWriteLatch().getCount() == 0)
074: att.startWriteLatch(1);
075: //only register for write if a write has not yet been issued
076: if ((att.interestOps() & SelectionKey.OP_WRITE) == 0)
077: socket.getPoller().add(socket,
078: SelectionKey.OP_WRITE);
079: att.awaitWriteLatch(writeTimeout,
080: TimeUnit.MILLISECONDS);
081: } catch (InterruptedException ignore) {
082: Thread.interrupted();
083: }
084: if (att.getWriteLatch() != null
085: && att.getWriteLatch().getCount() > 0) {
086: //we got interrupted, but we haven't received notification from the poller.
087: keycount = 0;
088: } else {
089: //latch countdown has happened
090: keycount = 1;
091: att.resetWriteLatch();
092: }
093:
094: if (writeTimeout > 0 && (keycount == 0))
095: timedout = (System.currentTimeMillis() - time) >= writeTimeout;
096: } //while
097: if (timedout)
098: throw new SocketTimeoutException();
099: } finally {
100: if (timedout && key != null) {
101: cancelKey(socket, key);
102: }
103: }
104: return written;
105: }
106:
107: private static void cancelKey(final NioChannel socket,
108: final SelectionKey key) {
109: socket.getPoller().addEvent(new Runnable() {
110: public void run() {
111: key.cancel();
112: }
113: });
114: }
115:
116: /**
117: * Performs a blocking read using the bytebuffer for data to be read
118: * If the <code>selector</code> parameter is null, then it will perform a busy read that could
119: * take up a lot of CPU cycles.
120: * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
121: * @param socket SocketChannel - the socket to write data to
122: * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
123: * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
124: * @return int - returns the number of bytes read
125: * @throws EOFException if read returns -1
126: * @throws SocketTimeoutException if the read times out
127: * @throws IOException if an IO Exception occurs in the underlying socket logic
128: */
129: public static int read(ByteBuffer buf, NioChannel socket,
130: long readTimeout) throws IOException {
131: final SelectionKey key = socket.getIOChannel().keyFor(
132: socket.getPoller().getSelector());
133: int read = 0;
134: boolean timedout = false;
135: int keycount = 1; //assume we can write
136: long time = System.currentTimeMillis(); //start the timeout timer
137: try {
138: while ((!timedout) && read == 0) {
139: if (keycount > 0) { //only read if we were registered for a read
140: int cnt = socket.read(buf);
141: if (cnt == -1)
142: throw new EOFException();
143: read += cnt;
144: if (cnt > 0)
145: break;
146: }
147: KeyAttachment att = (KeyAttachment) key.attachment();
148: try {
149: if (att.getReadLatch() == null
150: || att.getReadLatch().getCount() == 0)
151: att.startReadLatch(1);
152: if (att.interestOps() == 0)
153: socket.getPoller().add(socket,
154: SelectionKey.OP_READ);
155: att.awaitReadLatch(readTimeout,
156: TimeUnit.MILLISECONDS);
157: } catch (InterruptedException ignore) {
158: Thread.interrupted();
159: }
160: if (att.getReadLatch() != null
161: && att.getReadLatch().getCount() > 0) {
162: //we got interrupted, but we haven't received notification from the poller.
163: keycount = 0;
164: } else {
165: //latch countdown has happened
166: keycount = 1;
167: att.resetReadLatch();
168: }
169: if (readTimeout > 0 && (keycount == 0))
170: timedout = (System.currentTimeMillis() - time) >= readTimeout;
171: } //while
172: if (timedout)
173: throw new SocketTimeoutException();
174: } finally {
175: if (timedout && key != null) {
176: cancelKey(socket, key);
177: }
178: }
179: return read;
180: }
181:
182: }
|