001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.transport.socket.apr;
021:
022: import java.io.IOException;
023: import java.nio.ByteBuffer;
024: import java.util.HashMap;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Map;
028: import java.util.concurrent.Executor;
029:
030: import org.apache.mina.common.AbstractPollingIoProcessor;
031: import org.apache.mina.common.FileRegion;
032: import org.apache.mina.common.IoBuffer;
033: import org.apache.mina.common.RuntimeIoException;
034: import org.apache.mina.util.CircularQueue;
035: import org.apache.tomcat.jni.Poll;
036: import org.apache.tomcat.jni.Pool;
037: import org.apache.tomcat.jni.Socket;
038: import org.apache.tomcat.jni.Status;
039:
040: /**
041: * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
042: *
043: * @author The Apache MINA Project (dev@mina.apache.org)
044: * @version $Rev: 599788 $, $Date: 2007-11-30 04:42:57 -0700 (Fri, 30 Nov 2007) $
045: */
046:
047: public final class AprIoProcessor extends
048: AbstractPollingIoProcessor<AprSession> {
049: private static final int POLLSET_SIZE = 1024;
050:
051: private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(
052: POLLSET_SIZE);
053:
054: private final Object wakeupLock = new Object();
055: private long wakeupSocket;
056: private volatile boolean toBeWakenUp;
057:
058: private final long bufferPool; // memory pool
059: private final long pollset; // socket poller
060: private final long[] polledSockets = new long[POLLSET_SIZE << 1];
061: private final List<AprSession> polledSessions = new CircularQueue<AprSession>(
062: POLLSET_SIZE);
063:
064: public AprIoProcessor(Executor executor) {
065: super (executor);
066:
067: try {
068: wakeupSocket = Socket.create(Socket.APR_INET,
069: Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
070: .getInstance().getRootPool());
071: } catch (RuntimeException e) {
072: throw e;
073: } catch (Error e) {
074: throw e;
075: } catch (Exception e) {
076: throw new RuntimeIoException(
077: "Failed to create a wakeup socket.", e);
078: }
079:
080: // initialize a memory pool for APR functions
081: bufferPool = Pool
082: .create(AprLibrary.getInstance().getRootPool());
083:
084: boolean success = false;
085: long newPollset;
086: try {
087: newPollset = Poll.create(POLLSET_SIZE, AprLibrary
088: .getInstance().getRootPool(),
089: Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
090:
091: if (newPollset == 0) {
092: newPollset = Poll.create(62, AprLibrary.getInstance()
093: .getRootPool(), Poll.APR_POLLSET_THREADSAFE,
094: Long.MAX_VALUE);
095: }
096:
097: pollset = newPollset;
098: if (pollset < 0) {
099: if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
100: throw new RuntimeIoException(
101: "Thread-safe pollset is not supported in this platform.");
102: }
103: }
104: success = true;
105: } catch (RuntimeException e) {
106: throw e;
107: } catch (Error e) {
108: throw e;
109: } catch (Exception e) {
110: throw new RuntimeIoException("Failed to create a pollset.",
111: e);
112: } finally {
113: if (!success) {
114: dispose();
115: }
116: }
117: }
118:
119: @Override
120: protected void dispose0() {
121: Poll.destroy(pollset);
122: Pool.destroy(bufferPool);
123: Socket.close(wakeupSocket);
124: }
125:
126: @Override
127: protected boolean select(int timeout) throws Exception {
128: int rv = Poll.poll(pollset, 1000 * timeout, polledSockets,
129: false);
130: if (rv <= 0) {
131: if (rv != -120001) {
132: throwException(rv);
133: }
134:
135: rv = Poll.maintain(pollset, polledSockets, true);
136: if (rv > 0) {
137: for (int i = 0; i < rv; i++) {
138: long socket = polledSockets[i];
139: AprSession session = allSessions.get(socket);
140: if (session == null) {
141: continue;
142: }
143:
144: int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN
145: : 0)
146: | (session.isInterestedInWrite() ? Poll.APR_POLLOUT
147: : 0);
148:
149: Poll.add(pollset, socket, flag);
150: }
151: } else if (rv < 0) {
152: throwException(rv);
153: }
154:
155: return false;
156: } else {
157: rv <<= 1;
158: if (!polledSessions.isEmpty()) {
159: polledSessions.clear();
160: }
161: for (int i = 0; i < rv; i++) {
162: long flag = polledSockets[i];
163: long socket = polledSockets[++i];
164: if (socket == wakeupSocket) {
165: synchronized (wakeupLock) {
166: Poll.remove(pollset, wakeupSocket);
167: toBeWakenUp = false;
168: }
169: continue;
170: }
171: AprSession session = allSessions.get(socket);
172: if (session == null) {
173: continue;
174: }
175:
176: session.setReadable((flag & Poll.APR_POLLIN) != 0);
177: session.setWritable((flag & Poll.APR_POLLOUT) != 0);
178:
179: polledSessions.add(session);
180: }
181:
182: return !polledSessions.isEmpty();
183: }
184: }
185:
186: @Override
187: protected void wakeup() {
188: if (toBeWakenUp) {
189: return;
190: }
191:
192: // Add a dummy socket to the pollset.
193: synchronized (wakeupLock) {
194: toBeWakenUp = true;
195: Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
196: }
197: }
198:
199: @Override
200: protected Iterator<AprSession> allSessions() {
201: return allSessions.values().iterator();
202: }
203:
204: @Override
205: protected Iterator<AprSession> selectedSessions() {
206: return polledSessions.iterator();
207: }
208:
209: @Override
210: protected void init(AprSession session) throws Exception {
211: long s = session.getDescriptor();
212: Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
213: Socket.timeoutSet(s, 0);
214:
215: int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
216: if (rv != Status.APR_SUCCESS) {
217: throwException(rv);
218: }
219:
220: session.setInterestedInRead(true);
221: allSessions.put(s, session);
222: }
223:
224: @Override
225: protected void destroy(AprSession session) throws Exception {
226: allSessions.remove(session.getDescriptor());
227: int ret = Poll.remove(pollset, session.getDescriptor());
228: try {
229: if (ret != Status.APR_SUCCESS) {
230: throwException(ret);
231: }
232: } finally {
233: ret = Socket.close(session.getDescriptor());
234: if (ret != Status.APR_SUCCESS) {
235: throwException(ret);
236: }
237: }
238: }
239:
240: @Override
241: protected SessionState state(AprSession session) {
242: long socket = session.getDescriptor();
243: if (socket > 0) {
244: return SessionState.OPEN;
245: } else if (allSessions.get(socket) != null) {
246: return SessionState.PREPARING; // will occur ?
247: } else {
248: return SessionState.CLOSED;
249: }
250: }
251:
252: @Override
253: protected boolean isReadable(AprSession session) {
254: return session.isReadable();
255: }
256:
257: @Override
258: protected boolean isWritable(AprSession session) {
259: return session.isWritable();
260: }
261:
262: @Override
263: protected boolean isInterestedInRead(AprSession session) {
264: return session.isInterestedInRead();
265: }
266:
267: @Override
268: protected boolean isInterestedInWrite(AprSession session) {
269: return session.isInterestedInWrite();
270: }
271:
272: @Override
273: protected void setInterestedInRead(AprSession session, boolean value)
274: throws Exception {
275: int rv = Poll.remove(pollset, session.getDescriptor());
276: if (rv != Status.APR_SUCCESS) {
277: throwException(rv);
278: }
279:
280: int flags = (value ? Poll.APR_POLLIN : 0)
281: | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
282:
283: rv = Poll.add(pollset, session.getDescriptor(), flags);
284: if (rv == Status.APR_SUCCESS) {
285: session.setInterestedInRead(value);
286: } else {
287: throwException(rv);
288: }
289: }
290:
291: @Override
292: protected void setInterestedInWrite(AprSession session,
293: boolean value) throws Exception {
294: int rv = Poll.remove(pollset, session.getDescriptor());
295: if (rv != Status.APR_SUCCESS) {
296: throwException(rv);
297: }
298:
299: int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
300: | (value ? Poll.APR_POLLOUT : 0);
301:
302: rv = Poll.add(pollset, session.getDescriptor(), flags);
303: if (rv == Status.APR_SUCCESS) {
304: session.setInterestedInWrite(value);
305: } else {
306: throwException(rv);
307: }
308: }
309:
310: @Override
311: protected int read(AprSession session, IoBuffer buffer)
312: throws Exception {
313: int bytes;
314: // Using Socket.recv() directly causes memory leak. :-(
315: ByteBuffer b = Pool.alloc(bufferPool, buffer.remaining());
316: try {
317: bytes = Socket.recvb(session.getDescriptor(), b, 0, b
318: .remaining());
319: if (bytes > 0) {
320: b.position(0);
321: b.limit(bytes);
322: buffer.put(b);
323: } else if (bytes < 0) {
324: if (Status.APR_STATUS_IS_EOF(-bytes)) {
325: bytes = -1;
326: } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
327: bytes = 0;
328: } else {
329: throwException(bytes);
330: }
331: }
332: } finally {
333: Pool.clear(bufferPool);
334: }
335: return bytes;
336: }
337:
338: @Override
339: protected int write(AprSession session, IoBuffer buf, int length)
340: throws Exception {
341: int writtenBytes;
342: if (buf.isDirect()) {
343: writtenBytes = Socket.sendb(session.getDescriptor(), buf
344: .buf(), buf.position(), length);
345: } else {
346: writtenBytes = Socket.send(session.getDescriptor(), buf
347: .array(), buf.position(), length);
348: if (writtenBytes > 0) {
349: buf.skip(writtenBytes);
350: }
351: }
352:
353: if (writtenBytes < 0) {
354: if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
355: writtenBytes = 0;
356: } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
357: writtenBytes = 0;
358: } else {
359: throwException(writtenBytes);
360: }
361: }
362: return writtenBytes;
363: }
364:
365: @Override
366: protected int transferFile(AprSession session, FileRegion region,
367: int length) throws Exception {
368: throw new UnsupportedOperationException();
369: }
370:
371: private void throwException(int code) throws IOException {
372: throw new IOException(org.apache.tomcat.jni.Error
373: .strerror(-code)
374: + " (code: " + code + ")");
375: }
376: }
|