001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.transport;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
008:
009: import com.tc.exception.ImplementMe;
010: import com.tc.exception.TCInternalError;
011: import com.tc.exception.TCRuntimeException;
012: import com.tc.logging.TCLogging;
013: import com.tc.net.MaxConnectionsExceededException;
014: import com.tc.net.core.TCConnection;
015: import com.tc.net.core.event.TCConnectionEvent;
016: import com.tc.net.protocol.NetworkStackID;
017: import com.tc.net.protocol.TCNetworkMessage;
018: import com.tc.net.protocol.TCProtocolAdaptor;
019: import com.tc.util.Assert;
020: import com.tc.util.TCTimeoutException;
021: import com.tc.util.concurrent.TCExceptionResultException;
022: import com.tc.util.concurrent.TCFuture;
023:
024: import java.io.IOException;
025: import java.util.List;
026:
027: /**
028: * Client implementation of the transport network layer.
029: */
030: public class ClientMessageTransport extends MessageTransportBase {
031: // it was 2 minutes timeout, reduce to 10 sec
032: private static final long SYN_ACK_TIMEOUT = 10000;
033: private final ClientConnectionEstablisher connectionEstablisher;
034: private boolean wasOpened = false;
035: private TCFuture waitForSynAckResult;
036: private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
037: private final SynchronizedBoolean isOpening = new SynchronizedBoolean(
038: false);
039:
040: /**
041: * Constructor for when you want a transport that isn't connected yet (e.g., in a client). This constructor will
042: * create an unopened MessageTransport.
043: *
044: * @param commsManager CommmunicationsManager
045: */
046: public ClientMessageTransport(
047: ClientConnectionEstablisher clientConnectionEstablisher,
048: TransportHandshakeErrorHandler handshakeErrorHandler,
049: TransportHandshakeMessageFactory messageFactory,
050: WireProtocolAdaptorFactory wireProtocolAdaptorFactory) {
051:
052: super (MessageTransportState.STATE_START, handshakeErrorHandler,
053: messageFactory, false, TCLogging
054: .getLogger(ClientMessageTransport.class));
055: this .wireProtocolAdaptorFactory = wireProtocolAdaptorFactory;
056: this .connectionEstablisher = clientConnectionEstablisher;
057: }
058:
059: /**
060: * Blocking open. Causes a connection to be made. Will throw exceptions if the connect fails.
061: *
062: * @throws TCTimeoutException
063: * @throws IOException
064: * @throws TCTimeoutException
065: * @throws MaxConnectionsExceededException
066: */
067: public NetworkStackID open() throws TCTimeoutException,
068: IOException, MaxConnectionsExceededException {
069: // XXX: This extra boolean flag is dumb, but it's here because the close event can show up
070: // while the lock on isOpen is held here. That will cause a deadlock because the close event is thrown on the
071: // comms thread which means that the handshake messages can't be sent.
072: // The state machine here needs to be rationalized.
073: isOpening.set(true);
074: synchronized (isOpen) {
075: Assert.eval("can't open an already open transport", !isOpen
076: .get());
077: try {
078: wireNewConnection(connectionEstablisher.open(this ));
079: HandshakeResult result = handShake();
080: if (result.isMaxConnectionsExceeded()) {
081: // Hack to make the connection clear
082: // but don't do all the gunk around reconnect
083: // clean this up
084: List tl = this .getTransportListeners();
085: this .removeTransportListeners();
086: clearConnection();
087: this .addTransportListeners(tl);
088: status.reset();
089: throw new MaxConnectionsExceededException(
090: "Maximum number of client connections exceeded: "
091: + result.maxConnections());
092: }
093: Assert.eval(!this .connectionId.isNull());
094: isOpen.set(true);
095: sendAck();
096: NetworkStackID nid = new NetworkStackID(
097: this .connectionId.getChannelID());
098: wasOpened = true;
099: return (nid);
100: } catch (TCTimeoutException e) {
101: status.reset();
102: throw e;
103: } catch (IOException e) {
104: status.reset();
105: throw e;
106: } finally {
107: isOpening.set(false);
108: }
109: }
110: }
111:
112: /**
113: * Returns true if the MessageTransport was ever in an open state.
114: */
115: public boolean wasOpened() {
116: return wasOpened;
117: }
118:
119: public boolean isNotOpen() {
120: return !isOpening.get() && !isOpen.get();
121: }
122:
123: // TODO :: come back
124: public void closeEvent(TCConnectionEvent event) {
125: if (isNotOpen())
126: return;
127: status.reset();
128: super .closeEvent(event);
129: }
130:
131: protected void receiveTransportMessageImpl(
132: WireProtocolMessage message) {
133: synchronized (status) {
134: if (status.isSynSent()) {
135: handleSynAck(message);
136: message.recycle();
137: return;
138: }
139: }
140: super .receiveToReceiveLayer(message);
141: }
142:
143: private void handleSynAck(WireProtocolMessage message) {
144: if (!verifySynAck(message)) {
145: handleHandshakeError(new TransportHandshakeErrorContext(
146: "Received a message that was not a SYN_ACK while waiting for SYN_ACK: "
147: + message));
148: } else {
149: SynAckMessage synAck = (SynAckMessage) message;
150: if (synAck.hasErrorContext()) {
151: throw new ImplementMe(synAck.getErrorContext());
152: }
153:
154: if (connectionId != null
155: && !ConnectionID.NULL_ID.equals(connectionId)) {
156: // This is a reconnect
157: Assert.eval(connectionId.equals(synAck
158: .getConnectionId()));
159: }
160: if (!synAck.isMaxConnectionsExceeded()) {
161: this .connectionId = synAck.getConnectionId();
162:
163: Assert.assertNotNull(
164: "Connection id from the server was null!",
165: this .connectionId);
166: Assert.eval(!ConnectionID.NULL_ID
167: .equals(this .connectionId));
168: Assert.assertNotNull(this .waitForSynAckResult);
169: }
170: this .waitForSynAckResult.set(synAck);
171: }
172:
173: return;
174: }
175:
176: private boolean verifySynAck(TCNetworkMessage message) {
177: // XXX: yuck.
178: return message instanceof TransportHandshakeMessage
179: && ((TransportHandshakeMessage) message).isSynAck();
180: }
181:
182: /**
183: * Builds a protocol stack and tries to make a connection. This is a blocking call.
184: *
185: * @throws TCTimeoutException
186: * @throws MaxConnectionsExceededException
187: * @throws IOException
188: */
189: HandshakeResult handShake() throws TCTimeoutException {
190: sendSyn();
191: SynAckMessage synAck = waitForSynAck();
192: return new HandshakeResult(synAck.isMaxConnectionsExceeded(),
193: synAck.getMaxConnections());
194: }
195:
196: private SynAckMessage waitForSynAck() throws TCTimeoutException {
197: try {
198: SynAckMessage synAck = (SynAckMessage) waitForSynAckResult
199: .get(SYN_ACK_TIMEOUT);
200: return synAck;
201: } catch (InterruptedException e) {
202: throw new TCRuntimeException(e);
203: } catch (TCExceptionResultException e) {
204: throw new TCInternalError(e);
205: }
206: }
207:
208: private void sendSyn() {
209: synchronized (status) {
210: if (status.isEstablished() || status.isSynSent()) {
211: throw new AssertionError(" ERROR !!! " + status);
212: }
213: waitForSynAckResult = new TCFuture(status);
214: TransportHandshakeMessage syn = this .messageFactory
215: .createSyn(this .connectionId, getConnection());
216: // send syn message
217: this .sendToConnection(syn);
218: this .status.synSent();
219: }
220: }
221:
222: private void sendAck() {
223: synchronized (status) {
224: Assert.eval(status.isSynSent());
225: TransportHandshakeMessage ack = this .messageFactory
226: .createAck(this .connectionId, getConnection());
227: // send ack message
228: this .sendToConnection(ack);
229: this .status.established();
230: fireTransportConnectedEvent();
231: }
232: }
233:
234: void reconnect(TCConnection connection) throws Exception {
235:
236: // don't do reconnect if open is still going on
237: if (!wasOpened)
238: return;
239:
240: Assert.eval(!isConnected());
241: wireNewConnection(connection);
242: try {
243: HandshakeResult result = handShake();
244: sendAck();
245: if (result.isMaxConnectionsExceeded()) {
246: close();
247: throw new MaxConnectionsExceededException(
248: getMaxConnectionsExceededMessage(result
249: .maxConnections()));
250: }
251: } catch (Exception t) {
252: status.reset();
253: throw t;
254: }
255: }
256:
257: private String getMaxConnectionsExceededMessage(int maxConnections) {
258: return "Maximum number of client connections exceeded: "
259: + maxConnections;
260: }
261:
262: TCProtocolAdaptor getProtocolAdapter() {
263: return wireProtocolAdaptorFactory
264: .newWireProtocolAdaptor(new WireProtocolMessageSink() {
265: public void putMessage(WireProtocolMessage message) {
266: receiveTransportMessage(message);
267: }
268: });
269: }
270:
271: void endIfDisconnected() {
272: synchronized (this .status) {
273: if (!this .isConnected() && !this .status.isEnd()) {
274: this .status.end();
275: }
276: }
277: }
278:
279: private static final class HandshakeResult {
280: private final boolean maxConnectionsExceeded;
281: private final int maxConnections;
282:
283: private HandshakeResult(boolean maxConnectionsExceeded,
284: int maxConnections) {
285: this .maxConnectionsExceeded = maxConnectionsExceeded;
286: this .maxConnections = maxConnections;
287: }
288:
289: public int maxConnections() {
290: return this .maxConnections;
291: }
292:
293: public boolean isMaxConnectionsExceeded() {
294: return this .maxConnectionsExceeded;
295: }
296: }
297:
298: public ClientConnectionEstablisher getConnectionEstablisher() {
299: return connectionEstablisher;
300: }
301: }
|