001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.transport.socket.apr;
021:
022: import java.io.IOException;
023: import java.net.InetSocketAddress;
024: import java.net.SocketAddress;
025: import java.nio.ByteBuffer;
026: import java.util.HashMap;
027: import java.util.HashSet;
028: import java.util.Iterator;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.Set;
032: import java.util.concurrent.Executor;
033:
034: import org.apache.mina.common.AbstractPollingIoConnector;
035: import org.apache.mina.common.IoProcessor;
036: import org.apache.mina.common.RuntimeIoException;
037: import org.apache.mina.common.TransportMetadata;
038: import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
039: import org.apache.mina.transport.socket.SocketConnector;
040: import org.apache.mina.transport.socket.SocketSessionConfig;
041: import org.apache.mina.util.CircularQueue;
042: import org.apache.tomcat.jni.Address;
043: import org.apache.tomcat.jni.Poll;
044: import org.apache.tomcat.jni.Pool;
045: import org.apache.tomcat.jni.Socket;
046: import org.apache.tomcat.jni.Status;
047:
048: /**
049: * @author The Apache MINA Project (dev@mina.apache.org)
050: * @version $Rev: 600804 $, $Date: 2007-12-03 23:43:01 -0700 (Mon, 03 Dec 2007) $
051: */
052: public final class AprSocketConnector extends
053: AbstractPollingIoConnector<AprSession, Long> implements
054: SocketConnector {
055:
056: private static final int POLLSET_SIZE = 1024;
057:
058: private final Map<Long, ConnectionRequest> requests = new HashMap<Long, ConnectionRequest>(
059: POLLSET_SIZE);
060:
061: private final Object wakeupLock = new Object();
062: private long wakeupSocket;
063: private volatile boolean toBeWakenUp;
064:
065: private volatile long pool;
066: private volatile long pollset; // socket poller
067: private final long[] polledSockets = new long[POLLSET_SIZE << 1];
068: private final List<Long> polledHandles = new CircularQueue<Long>(
069: POLLSET_SIZE);
070: private final Set<Long> failedHandles = new HashSet<Long>(
071: POLLSET_SIZE);
072: private volatile ByteBuffer dummyBuffer;
073:
074: public AprSocketConnector() {
075: super (new DefaultSocketSessionConfig(), AprIoProcessor.class);
076: }
077:
078: public AprSocketConnector(int processorCount) {
079: super (new DefaultSocketSessionConfig(), AprIoProcessor.class,
080: processorCount);
081: }
082:
083: public AprSocketConnector(IoProcessor<AprSession> processor) {
084: super (new DefaultSocketSessionConfig(), processor);
085: }
086:
087: public AprSocketConnector(Executor executor,
088: IoProcessor<AprSession> processor) {
089: super (new DefaultSocketSessionConfig(), executor, processor);
090: }
091:
092: @Override
093: protected void init() throws Exception {
094: wakeupSocket = Socket.create(Socket.APR_INET,
095: Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
096: .getInstance().getRootPool());
097:
098: // initialize a memory pool for APR functions
099: pool = Pool.create(AprLibrary.getInstance().getRootPool());
100: dummyBuffer = Pool.alloc(pool, 1);
101:
102: pollset = Poll.create(POLLSET_SIZE, pool,
103: Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
104:
105: if (pollset <= 0) {
106: pollset = Poll.create(62, pool,
107: Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
108: }
109:
110: if (pollset <= 0) {
111: if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
112: throw new RuntimeIoException(
113: "Thread-safe pollset is not supported in this platform.");
114: }
115: }
116: }
117:
118: @Override
119: protected void destroy() throws Exception {
120: if (wakeupSocket > 0) {
121: Socket.close(wakeupSocket);
122: }
123: if (pollset > 0) {
124: Poll.destroy(pollset);
125: }
126: if (pool > 0) {
127: Pool.destroy(pool);
128: }
129: }
130:
131: @Override
132: protected Iterator<Long> allHandles() {
133: return polledHandles.iterator();
134: }
135:
136: @Override
137: protected boolean connect(Long handle, SocketAddress remoteAddress)
138: throws Exception {
139: InetSocketAddress ra = (InetSocketAddress) remoteAddress;
140: long sa;
141: if (ra != null) {
142: if (ra.getAddress() == null) {
143: sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET,
144: ra.getPort(), 0, pool);
145: } else {
146: sa = Address.info(ra.getAddress().getHostAddress(),
147: Socket.APR_INET, ra.getPort(), 0, pool);
148: }
149: } else {
150: sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0,
151: 0, pool);
152: }
153:
154: int rv = Socket.connect(handle, sa);
155: if (rv == Status.APR_SUCCESS) {
156: return true;
157: }
158:
159: if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
160: return false;
161: }
162:
163: throwException(rv);
164: throw new InternalError(); // This sentence will never be executed.
165: }
166:
167: @Override
168: protected ConnectionRequest connectionRequest(Long handle) {
169: return requests.get(handle);
170: }
171:
172: @Override
173: protected void close(Long handle) throws Exception {
174: finishConnect(handle);
175: int rv = Socket.close(handle);
176: if (rv != Status.APR_SUCCESS) {
177: throwException(rv);
178: }
179: }
180:
181: @Override
182: protected boolean finishConnect(Long handle) throws Exception {
183: Poll.remove(pollset, handle);
184: requests.remove(handle);
185: if (failedHandles.remove(handle)) {
186: int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
187: throwException(rv);
188: throw new InternalError("Shouldn't reach here.");
189: }
190: return true;
191: }
192:
193: @Override
194: protected Long newHandle(SocketAddress localAddress)
195: throws Exception {
196: long handle = Socket.create(Socket.APR_INET,
197: Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
198: boolean success = false;
199: try {
200: Socket.optSet(handle, Socket.APR_SO_REUSEADDR, 1);
201:
202: if (localAddress != null) {
203: InetSocketAddress la = (InetSocketAddress) localAddress;
204: long sa;
205: if (la != null) {
206: if (la.getAddress() == null) {
207: sa = Address.info(Address.APR_ANYADDR,
208: Socket.APR_INET, la.getPort(), 0, pool);
209: } else {
210: sa = Address.info(la.getAddress()
211: .getHostAddress(), Socket.APR_INET, la
212: .getPort(), 0, pool);
213: }
214: } else {
215: sa = Address.info(Address.APR_ANYADDR,
216: Socket.APR_INET, 0, 0, pool);
217: }
218:
219: int result = Socket.bind(handle, sa);
220: if (result != Status.APR_SUCCESS) {
221: throwException(result);
222: }
223: }
224:
225: Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
226: success = true;
227: return handle;
228: } finally {
229: if (!success) {
230: int rv = Socket.close(handle);
231: if (rv != Status.APR_SUCCESS) {
232: throwException(rv);
233: }
234: }
235: }
236: }
237:
238: @Override
239: protected AprSession newSession(IoProcessor<AprSession> processor,
240: Long handle) throws Exception {
241: return new AprSocketSession(this , processor, handle);
242: }
243:
244: @Override
245: protected void register(Long handle, ConnectionRequest request)
246: throws Exception {
247: int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
248: if (rv != Status.APR_SUCCESS) {
249: throwException(rv);
250: }
251:
252: requests.put(handle, request);
253: }
254:
255: @Override
256: protected boolean select(int timeout) throws Exception {
257: int rv = Poll.poll(pollset, timeout * 1000, polledSockets,
258: false);
259: if (rv <= 0) {
260: if (rv != -120001) {
261: throwException(rv);
262: }
263:
264: rv = Poll.maintain(pollset, polledSockets, true);
265: if (rv > 0) {
266: for (int i = 0; i < rv; i++) {
267: Poll.add(pollset, polledSockets[i],
268: Poll.APR_POLLOUT);
269: }
270: } else if (rv < 0) {
271: throwException(rv);
272: }
273:
274: return false;
275: } else {
276: rv <<= 1;
277: if (!polledHandles.isEmpty()) {
278: polledHandles.clear();
279: }
280:
281: for (int i = 0; i < rv; i++) {
282: long flag = polledSockets[i];
283: long socket = polledSockets[++i];
284: if (socket == wakeupSocket) {
285: synchronized (wakeupLock) {
286: Poll.remove(pollset, wakeupSocket);
287: toBeWakenUp = false;
288: }
289: continue;
290: }
291: polledHandles.add(socket);
292: if ((flag & Poll.APR_POLLOUT) == 0) {
293: failedHandles.add(socket);
294: }
295: }
296: return !polledHandles.isEmpty();
297: }
298: }
299:
300: @Override
301: protected Iterator<Long> selectedHandles() {
302: return polledHandles.iterator();
303: }
304:
305: @Override
306: protected void wakeup() {
307: if (toBeWakenUp) {
308: return;
309: }
310:
311: // Add a dummy socket to the pollset.
312: synchronized (wakeupLock) {
313: toBeWakenUp = true;
314: Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
315: }
316: }
317:
318: public TransportMetadata getTransportMetadata() {
319: return AprSocketSession.METADATA;
320: }
321:
322: @Override
323: public SocketSessionConfig getSessionConfig() {
324: return (SocketSessionConfig) super .getSessionConfig();
325: }
326:
327: @Override
328: public InetSocketAddress getDefaultRemoteAddress() {
329: return (InetSocketAddress) super .getDefaultRemoteAddress();
330: }
331:
332: public void setDefaultRemoteAddress(
333: InetSocketAddress defaultRemoteAddress) {
334: super .setDefaultRemoteAddress(defaultRemoteAddress);
335: }
336:
337: private void throwException(int code) throws IOException {
338: throw new IOException(org.apache.tomcat.jni.Error
339: .strerror(-code)
340: + " (code: " + code + ")");
341: }
342: }
|