001: package org.apache.mina.transport.socket.apr;
002:
003: import java.io.IOException;
004: import java.net.InetSocketAddress;
005: import java.net.SocketAddress;
006: import java.util.Iterator;
007: import java.util.List;
008: import java.util.concurrent.Executor;
009:
010: import org.apache.mina.common.AbstractPollingIoAcceptor;
011: import org.apache.mina.common.IoProcessor;
012: import org.apache.mina.common.RuntimeIoException;
013: import org.apache.mina.common.TransportMetadata;
014: import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
015: import org.apache.mina.transport.socket.SocketAcceptor;
016: import org.apache.mina.transport.socket.SocketSessionConfig;
017: import org.apache.mina.util.CircularQueue;
018: import org.apache.tomcat.jni.Address;
019: import org.apache.tomcat.jni.Poll;
020: import org.apache.tomcat.jni.Pool;
021: import org.apache.tomcat.jni.Socket;
022: import org.apache.tomcat.jni.Status;
023:
024: public final class AprSocketAcceptor extends
025: AbstractPollingIoAcceptor<AprSession, Long> implements
026: SocketAcceptor {
027:
028: private static final int POLLSET_SIZE = 1024;
029:
030: private final Object wakeupLock = new Object();
031: private long wakeupSocket;
032: private volatile boolean toBeWakenUp;
033:
034: private int backlog = 50;
035: private boolean reuseAddress = true;
036:
037: private volatile long pool;
038: private volatile long pollset; // socket poller
039: private final long[] polledSockets = new long[POLLSET_SIZE << 1];
040: private final List<Long> polledHandles = new CircularQueue<Long>(
041: POLLSET_SIZE);
042:
043: public AprSocketAcceptor() {
044: super (new DefaultSocketSessionConfig(), AprIoProcessor.class);
045: }
046:
047: public AprSocketAcceptor(int processorCount) {
048: super (new DefaultSocketSessionConfig(), AprIoProcessor.class,
049: processorCount);
050: }
051:
052: public AprSocketAcceptor(IoProcessor<AprSession> processor) {
053: super (new DefaultSocketSessionConfig(), processor);
054: }
055:
056: public AprSocketAcceptor(Executor executor,
057: IoProcessor<AprSession> processor) {
058: super (new DefaultSocketSessionConfig(), executor, processor);
059: }
060:
061: @Override
062: protected AprSession accept(IoProcessor<AprSession> processor,
063: Long handle) throws Exception {
064: long s = Socket.accept(handle);
065: boolean success = false;
066: try {
067: AprSession result = new AprSocketSession(this , processor, s);
068: success = true;
069: return result;
070: } finally {
071: if (!success) {
072: Socket.close(s);
073: }
074: }
075: }
076:
077: @Override
078: protected Long open(SocketAddress localAddress) throws Exception {
079: InetSocketAddress la = (InetSocketAddress) localAddress;
080: long handle = Socket.create(Socket.APR_INET,
081: Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
082:
083: boolean success = false;
084: try {
085: Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
086: Socket.timeoutSet(handle, 0);
087:
088: // Configure the server socket,
089: Socket.optSet(handle, Socket.APR_SO_REUSEADDR,
090: isReuseAddress() ? 1 : 0);
091: Socket.optSet(handle, Socket.APR_SO_RCVBUF,
092: getSessionConfig().getReceiveBufferSize());
093:
094: // and bind.
095: long sa;
096: if (la != null) {
097: if (la.getAddress() == null) {
098: sa = Address.info(Address.APR_ANYADDR,
099: Socket.APR_INET, la.getPort(), 0, pool);
100: } else {
101: sa = Address.info(la.getAddress().getHostAddress(),
102: Socket.APR_INET, la.getPort(), 0, pool);
103: }
104: } else {
105: sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET,
106: 0, 0, pool);
107: }
108:
109: int result = Socket.bind(handle, sa);
110: if (result != Status.APR_SUCCESS) {
111: throwException(result);
112: }
113: result = Socket.listen(handle, getBacklog());
114: if (result != Status.APR_SUCCESS) {
115: throwException(result);
116: }
117:
118: result = Poll.add(pollset, handle, Poll.APR_POLLIN);
119: if (result != Status.APR_SUCCESS) {
120: throwException(result);
121: }
122: success = true;
123: } finally {
124: if (!success) {
125: close(handle);
126: }
127: }
128: return handle;
129: }
130:
131: @Override
132: protected void init() throws Exception {
133: wakeupSocket = Socket.create(Socket.APR_INET,
134: Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
135: .getInstance().getRootPool());
136:
137: // initialize a memory pool for APR functions
138: pool = Pool.create(AprLibrary.getInstance().getRootPool());
139:
140: pollset = Poll.create(POLLSET_SIZE, pool,
141: Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
142:
143: if (pollset <= 0) {
144: pollset = Poll.create(62, pool,
145: Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
146: }
147:
148: if (pollset <= 0) {
149: if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
150: throw new RuntimeIoException(
151: "Thread-safe pollset is not supported in this platform.");
152: }
153: }
154: }
155:
156: @Override
157: protected void destroy() throws Exception {
158: if (wakeupSocket > 0) {
159: Socket.close(wakeupSocket);
160: }
161: if (pollset > 0) {
162: Poll.destroy(pollset);
163: }
164: if (pool > 0) {
165: Pool.destroy(pool);
166: }
167: }
168:
169: @Override
170: protected SocketAddress localAddress(Long handle) throws Exception {
171: long la = Address.get(Socket.APR_LOCAL, handle);
172: return new InetSocketAddress(Address.getip(la), Address
173: .getInfo(la).port);
174: }
175:
176: @Override
177: protected boolean select() throws Exception {
178: int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets,
179: false);
180: if (rv <= 0) {
181: if (rv != -120001) {
182: throwException(rv);
183: }
184:
185: rv = Poll.maintain(pollset, polledSockets, true);
186: if (rv > 0) {
187: for (int i = 0; i < rv; i++) {
188: Poll
189: .add(pollset, polledSockets[i],
190: Poll.APR_POLLIN);
191: }
192: } else if (rv < 0) {
193: throwException(rv);
194: }
195:
196: return false;
197: } else {
198: rv <<= 1;
199: if (!polledHandles.isEmpty()) {
200: polledHandles.clear();
201: }
202:
203: for (int i = 0; i < rv; i++) {
204: long flag = polledSockets[i];
205: long socket = polledSockets[++i];
206: if (socket == wakeupSocket) {
207: synchronized (wakeupLock) {
208: Poll.remove(pollset, wakeupSocket);
209: toBeWakenUp = false;
210: }
211: continue;
212: }
213:
214: if ((flag & Poll.APR_POLLIN) != 0) {
215: polledHandles.add(socket);
216: }
217: }
218: return !polledHandles.isEmpty();
219: }
220: }
221:
222: @Override
223: protected Iterator<Long> selectedHandles() {
224: return polledHandles.iterator();
225: }
226:
227: @Override
228: protected void close(Long handle) throws Exception {
229: Poll.remove(pollset, handle);
230: int result = Socket.close(handle);
231: if (result != Status.APR_SUCCESS) {
232: throwException(result);
233: }
234: }
235:
236: @Override
237: protected void wakeup() {
238: if (toBeWakenUp) {
239: return;
240: }
241:
242: // Add a dummy socket to the pollset.
243: synchronized (wakeupLock) {
244: toBeWakenUp = true;
245: Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
246: }
247: }
248:
249: public int getBacklog() {
250: return backlog;
251: }
252:
253: public boolean isReuseAddress() {
254: return reuseAddress;
255: }
256:
257: public void setBacklog(int backlog) {
258: synchronized (bindLock) {
259: if (isActive()) {
260: throw new IllegalStateException(
261: "backlog can't be set while the acceptor is bound.");
262: }
263:
264: this .backlog = backlog;
265: }
266: }
267:
268: @Override
269: public InetSocketAddress getLocalAddress() {
270: return (InetSocketAddress) super .getLocalAddress();
271: }
272:
273: @Override
274: public InetSocketAddress getDefaultLocalAddress() {
275: return (InetSocketAddress) super .getDefaultLocalAddress();
276: }
277:
278: public void setDefaultLocalAddress(InetSocketAddress localAddress) {
279: super .setDefaultLocalAddress(localAddress);
280: }
281:
282: public void setReuseAddress(boolean reuseAddress) {
283: synchronized (bindLock) {
284: if (isActive()) {
285: throw new IllegalStateException(
286: "backlog can't be set while the acceptor is bound.");
287: }
288:
289: this .reuseAddress = reuseAddress;
290: }
291: }
292:
293: public TransportMetadata getTransportMetadata() {
294: return AprSocketSession.METADATA;
295: }
296:
297: @Override
298: public SocketSessionConfig getSessionConfig() {
299: return (SocketSessionConfig) super .getSessionConfig();
300: }
301:
302: private void throwException(int code) throws IOException {
303: throw new IOException(org.apache.tomcat.jni.Error
304: .strerror(-code)
305: + " (code: " + code + ")");
306: }
307: }
|