001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.transport.vmpipe;
021:
022: import java.io.IOException;
023: import java.net.SocketAddress;
024: import java.util.HashSet;
025: import java.util.Set;
026:
027: import org.apache.mina.common.AbstractIoConnector;
028: import org.apache.mina.common.ConnectFuture;
029: import org.apache.mina.common.DefaultConnectFuture;
030: import org.apache.mina.common.ExceptionMonitor;
031: import org.apache.mina.common.IdleStatusChecker;
032: import org.apache.mina.common.IoFilterChain;
033: import org.apache.mina.common.IoFuture;
034: import org.apache.mina.common.IoFutureListener;
035: import org.apache.mina.common.IoHandler;
036: import org.apache.mina.common.IoSessionInitializer;
037: import org.apache.mina.common.TransportMetadata;
038:
039: /**
040: * Connects to {@link IoHandler}s which is bound on the specified
041: * {@link VmPipeAddress}.
042: *
043: * @author The Apache MINA Project (dev@mina.apache.org)
044: * @version $Rev: 607163 $, $Date: 2007-12-27 20:20:07 -0700 (Thu, 27 Dec 2007) $
045: */
046: public final class VmPipeConnector extends AbstractIoConnector {
047:
048: /**
049: * Creates a new instance.
050: */
051: public VmPipeConnector() {
052: super (new DefaultVmPipeSessionConfig());
053: }
054:
055: public TransportMetadata getTransportMetadata() {
056: return VmPipeSessionImpl.METADATA;
057: }
058:
059: @Override
060: public VmPipeSessionConfig getSessionConfig() {
061: return (VmPipeSessionConfig) super .getSessionConfig();
062: }
063:
064: @Override
065: protected ConnectFuture connect0(
066: SocketAddress remoteAddress,
067: SocketAddress localAddress,
068: IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
069: VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
070: if (entry == null) {
071: return DefaultConnectFuture
072: .newFailedFuture(new IOException(
073: "Endpoint unavailable: " + remoteAddress));
074: }
075:
076: DefaultConnectFuture future = new DefaultConnectFuture();
077:
078: // Assign the local address dynamically,
079: VmPipeAddress actualLocalAddress;
080: try {
081: actualLocalAddress = nextLocalAddress();
082: } catch (IOException e) {
083: return DefaultConnectFuture.newFailedFuture(e);
084: }
085:
086: VmPipeSessionImpl localSession = new VmPipeSessionImpl(this ,
087: getListeners(), actualLocalAddress, getHandler(), entry);
088:
089: finishSessionInitialization(localSession, future,
090: sessionInitializer);
091:
092: // and reclaim the local address when the connection is closed.
093: localSession.getCloseFuture().addListener(
094: LOCAL_ADDRESS_RECLAIMER);
095:
096: // initialize connector session
097: try {
098: IoFilterChain filterChain = localSession.getFilterChain();
099: this .getFilterChainBuilder().buildFilterChain(filterChain);
100:
101: // The following sentences don't throw any exceptions.
102: getListeners().fireSessionCreated(localSession);
103: IdleStatusChecker.getInstance().addSession(localSession);
104: } catch (Throwable t) {
105: future.setException(t);
106: return future;
107: }
108:
109: // initialize acceptor session
110: VmPipeSessionImpl remoteSession = localSession
111: .getRemoteSession();
112: ((VmPipeAcceptor) remoteSession.getService())
113: .doFinishSessionInitialization(remoteSession, null);
114: try {
115: IoFilterChain filterChain = remoteSession.getFilterChain();
116: entry.getAcceptor().getFilterChainBuilder()
117: .buildFilterChain(filterChain);
118:
119: // The following sentences don't throw any exceptions.
120: entry.getListeners().fireSessionCreated(remoteSession);
121: IdleStatusChecker.getInstance().addSession(remoteSession);
122: } catch (Throwable t) {
123: ExceptionMonitor.getInstance().exceptionCaught(t);
124: remoteSession.close();
125: }
126:
127: // Start chains, and then allow and messages read/written to be processed. This is to ensure that
128: // sessionOpened gets received before a messageReceived
129: ((VmPipeFilterChain) localSession.getFilterChain()).start();
130: ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
131:
132: return future;
133: }
134:
135: @Override
136: protected IoFuture dispose0() throws Exception {
137: return null;
138: }
139:
140: private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>();
141:
142: private static int nextLocalPort = -1;
143:
144: private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer();
145:
146: private static VmPipeAddress nextLocalAddress() throws IOException {
147: synchronized (TAKEN_LOCAL_ADDRESSES) {
148: if (nextLocalPort >= 0) {
149: nextLocalPort = -1;
150: }
151: for (int i = 0; i < Integer.MAX_VALUE; i++) {
152: VmPipeAddress answer = new VmPipeAddress(
153: nextLocalPort--);
154: if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) {
155: TAKEN_LOCAL_ADDRESSES.add(answer);
156: return answer;
157: }
158: }
159: }
160:
161: throw new IOException("Can't assign a local VM pipe port.");
162: }
163:
164: private static class LocalAddressReclaimer implements
165: IoFutureListener<IoFuture> {
166: public void operationComplete(IoFuture future) {
167: synchronized (TAKEN_LOCAL_ADDRESSES) {
168: TAKEN_LOCAL_ADDRESSES.remove(future.getSession()
169: .getLocalAddress());
170: }
171: }
172: }
173: }
|