001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.transport;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
009:
010: import com.tc.bytes.TCByteBuffer;
011: import com.tc.logging.ConnectionIDProvider;
012: import com.tc.logging.TCLogger;
013: import com.tc.net.MaxConnectionsExceededException;
014: import com.tc.net.TCSocketAddress;
015: import com.tc.net.core.TCConnection;
016: import com.tc.net.core.event.TCConnectionErrorEvent;
017: import com.tc.net.core.event.TCConnectionEvent;
018: import com.tc.net.core.event.TCConnectionEventListener;
019: import com.tc.net.protocol.IllegalReconnectException;
020: import com.tc.net.protocol.NetworkLayer;
021: import com.tc.net.protocol.NetworkStackID;
022: import com.tc.net.protocol.TCNetworkMessage;
023: import com.tc.util.Assert;
024: import com.tc.util.TCTimeoutException;
025:
026: import java.io.IOException;
027:
028: /**
029: * Implementation of MessaageTransport
030: */
031: abstract class MessageTransportBase extends AbstractMessageTransport
032: implements NetworkLayer, TCConnectionEventListener,
033: ConnectionIDProvider {
034: private TCConnection connection;
035:
036: protected ConnectionID connectionId = ConnectionID.NULL_ID;
037: protected final MessageTransportStatus status;
038: protected final SynchronizedBoolean isOpen;
039: protected final TransportHandshakeMessageFactory messageFactory;
040: private final TransportHandshakeErrorHandler handshakeErrorHandler;
041: private NetworkLayer receiveLayer;
042:
043: private final Object attachingNewConnection = new Object();
044: private final SynchronizedRef connectionCloseEvent = new SynchronizedRef(
045: null);
046: private byte[] sourceAddress;
047: private int sourcePort;
048: private byte[] destinationAddress;
049: private int destinationPort;
050: private boolean allowConnectionReplace = false;
051:
052: protected MessageTransportBase(MessageTransportState initialState,
053: TransportHandshakeErrorHandler handshakeErrorHandler,
054: TransportHandshakeMessageFactory messageFactory,
055: boolean isOpen, TCLogger logger) {
056:
057: super (logger);
058: this .handshakeErrorHandler = handshakeErrorHandler;
059: this .messageFactory = messageFactory;
060: this .isOpen = new SynchronizedBoolean(isOpen);
061: this .status = new MessageTransportStatus(initialState, logger);
062: }
063:
064: public void setAllowConnectionReplace(boolean allow) {
065: this .allowConnectionReplace = allow;
066: }
067:
068: public final ConnectionID getConnectionId() {
069: return this .connectionId;
070: }
071:
072: public final void setReceiveLayer(NetworkLayer layer) {
073: this .receiveLayer = layer;
074: }
075:
076: public final void setSendLayer(NetworkLayer layer) {
077: throw new UnsupportedOperationException(
078: "Transport layer has no send layer.");
079: }
080:
081: public final void receiveTransportMessage(
082: WireProtocolMessage message) {
083: synchronized (attachingNewConnection) {
084: if (message.getSource() == this .connection) {
085: receiveTransportMessageImpl(message);
086: } else {
087: logger.warn("Received message from an old connection: "
088: + message);
089: }
090: }
091: }
092:
093: public abstract NetworkStackID open()
094: throws MaxConnectionsExceededException, TCTimeoutException,
095: IOException;
096:
097: protected abstract void receiveTransportMessageImpl(
098: WireProtocolMessage message);
099:
100: protected final void receiveToReceiveLayer(
101: WireProtocolMessage message) {
102: Assert.assertNotNull(receiveLayer);
103: if (message instanceof TransportHandshakeMessage) {
104: throw new AssertionError("Wrong handshake message from: "
105: + message.getSource());
106: }
107:
108: this .receiveLayer.receive(message.getPayload());
109: message.getWireProtocolHeader().recycle();
110: }
111:
112: public final void receive(TCByteBuffer[] msgData) {
113: throw new UnsupportedOperationException();
114: }
115:
116: /**
117: * Moves the MessageTransport state to closed and closes the underlying connection, if any.
118: */
119: public void close() {
120: synchronized (isOpen) {
121: if (!isOpen.get()) {
122: // see DEV-659: we used to throw an assertion error here if already closed
123: logger.warn("Can only close an open connection");
124: return;
125: }
126: isOpen.set(false);
127: fireTransportClosedEvent();
128: }
129:
130: synchronized (status) {
131: if (connection != null && !this .connection.isClosed()) {
132: this .connection.asynchClose();
133: }
134: }
135: }
136:
137: public final void send(TCNetworkMessage message) {
138: // synchronized (isOpen) {
139: // Assert.eval("Can't send on an unopen transport [" +
140: // Thread.currentThread().getName() + "]", isOpen.get());
141: // }
142:
143: synchronized (status) {
144: if (!status.isEstablished()) {
145: logger
146: .warn("Ignoring message sent to non-established transport: "
147: + message);
148: return;
149: }
150:
151: sendToConnection(message);
152: }
153: }
154:
155: public final void sendToConnection(TCNetworkMessage message) {
156: if (message == null)
157: throw new AssertionError("Attempt to send a null message.");
158: if (!(message instanceof WireProtocolMessage)) {
159: final TCNetworkMessage payload = message;
160:
161: message = WireProtocolMessageImpl.wrapMessage(message,
162: connection);
163: Assert.eval(message.getSentCallback() == null);
164:
165: final Runnable callback = payload.getSentCallback();
166: if (callback != null) {
167: message.setSentCallback(new Runnable() {
168: public void run() {
169: callback.run();
170: }
171: });
172: }
173: }
174:
175: WireProtocolHeader hdr = (WireProtocolHeader) message
176: .getHeader();
177:
178: hdr.setSourceAddress(getSourceAddress());
179: hdr.setSourcePort(getSourcePort());
180: hdr.setDestinationAddress(getDestinationAddress());
181: hdr.setDestinationPort(getDestinationPort());
182: hdr.computeChecksum();
183:
184: connection.putMessage(message);
185: }
186:
187: /**
188: * Returns true if the underlying connection is open.
189: */
190: public final boolean isConnected() {
191: synchronized (status) {
192: return this .status.isEstablished();
193: }
194: }
195:
196: public final void attachNewConnection(TCConnection newConnection)
197: throws IllegalReconnectException {
198: synchronized (attachingNewConnection) {
199: if ((this .connection != null) && !allowConnectionReplace) {
200: throw new IllegalReconnectException();
201: }
202:
203: getConnectionAttacher()
204: .attachNewConnection(
205: (TCConnectionEvent) this .connectionCloseEvent
206: .get(), this .connection,
207: newConnection);
208: }
209: }
210:
211: protected ConnectionAttacher getConnectionAttacher() {
212: return new DefaultConnectionAttacher(this );
213: }
214:
215: protected interface ConnectionAttacher {
216: public void attachNewConnection(TCConnectionEvent closeEvent,
217: TCConnection oldConnection, TCConnection newConnection);
218: }
219:
220: private static final class DefaultConnectionAttacher implements
221: ConnectionAttacher {
222:
223: private final MessageTransportBase transport;
224:
225: private DefaultConnectionAttacher(MessageTransportBase transport) {
226: this .transport = transport;
227: }
228:
229: public void attachNewConnection(TCConnectionEvent closeEvent,
230: TCConnection oldConnection, TCConnection newConnection) {
231: Assert.assertNotNull(oldConnection);
232: if (closeEvent == null
233: || closeEvent.getSource() != oldConnection) {
234: // We either didn't receive a close event or we received a close event
235: // from a connection that isn't our current connection.
236: this .transport.fireTransportDisconnectedEvent();
237: }
238: // remove the transport as a listener for the old connection
239: if (oldConnection != null
240: && oldConnection != transport.connection) {
241: oldConnection.removeListener(transport);
242: }
243: // set the new connection to the current connection.
244: transport.wireNewConnection(newConnection);
245: }
246: }
247:
248: /*********************************************************************************************************************
249: * TCConnection listener interface
250: */
251:
252: public void connectEvent(TCConnectionEvent event) {
253: return;
254: }
255:
256: public void closeEvent(TCConnectionEvent event) {
257: boolean isSameConnection = false;
258:
259: synchronized (attachingNewConnection) {
260: TCConnection src = event.getSource();
261: isSameConnection = (src == this .connection);
262: if (isSameConnection) {
263: this .connectionCloseEvent.set(event);
264: }
265: }
266:
267: if (isSameConnection) {
268: fireTransportDisconnectedEvent();
269: }
270: }
271:
272: public void errorEvent(TCConnectionErrorEvent errorEvent) {
273: return;
274: }
275:
276: public void endOfFileEvent(TCConnectionEvent event) {
277: return;
278: }
279:
280: protected void handleHandshakeError(TransportHandshakeErrorContext e) {
281: this .handshakeErrorHandler.handleHandshakeError(e);
282: }
283:
284: protected void handleHandshakeError(
285: TransportHandshakeErrorContext e,
286: TransportHandshakeMessage m) {
287: this .handshakeErrorHandler.handleHandshakeError(e, m);
288: }
289:
290: protected TCConnection getConnection() {
291: return connection;
292: }
293:
294: public TCSocketAddress getRemoteAddress() {
295: return this .connection.getRemoteAddress();
296: }
297:
298: public TCSocketAddress getLocalAddress() {
299: return this .connection.getLocalAddress();
300: }
301:
302: protected void setConnection(TCConnection conn) {
303: TCConnection old = this .connection;
304: this .connection = conn;
305: clearAddressCache();
306: this .connection.addListener(this );
307: if (old != null) {
308: old.removeListener(this );
309: }
310: }
311:
312: protected void clearConnection() {
313: getConnection().close(10000);
314: this .connectionId = ConnectionID.NULL_ID;
315: this .connection.removeListener(this );
316: clearAddressCache();
317: this .connection = null;
318: }
319:
320: private void clearAddressCache() {
321: this .sourceAddress = null;
322: this .sourcePort = -1;
323: this .destinationAddress = null;
324: this .destinationPort = -1;
325: }
326:
327: private byte[] getSourceAddress() {
328: if (sourceAddress == null) {
329: return sourceAddress = connection.getLocalAddress()
330: .getAddressBytes();
331: }
332: return sourceAddress;
333: }
334:
335: private byte[] getDestinationAddress() {
336: if (destinationAddress == null) {
337: return destinationAddress = connection.getRemoteAddress()
338: .getAddressBytes();
339: }
340: return destinationAddress;
341: }
342:
343: private int getSourcePort() {
344: if (sourcePort == -1) {
345: return this .sourcePort = connection.getLocalAddress()
346: .getPort();
347: }
348: return sourcePort;
349: }
350:
351: private int getDestinationPort() {
352: if (destinationPort == -1) {
353: return this .destinationPort = connection.getRemoteAddress()
354: .getPort();
355: }
356: return sourcePort;
357: }
358:
359: protected void wireNewConnection(TCConnection conn) {
360: logger.info("Attaching new connection: " + conn);
361: setConnection(conn);
362: this.status.reset();
363: }
364: }
|