001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol.transport;
005:
006: import com.tc.logging.TCLogger;
007: import com.tc.logging.TCLogging;
008: import com.tc.net.core.TCConnection;
009: import com.tc.net.core.event.TCConnectionEvent;
010: import com.tc.net.protocol.NetworkStackID;
011: import com.tc.util.Assert;
012:
013: public class ServerMessageTransport extends MessageTransportBase {
014:
015: private static final TCLogger smtLogger = TCLogging
016: .getLogger(ServerMessageTransport.class);
017:
018: public ServerMessageTransport(ConnectionID connectionID,
019: TransportHandshakeErrorHandler handshakeErrorHandler,
020: TransportHandshakeMessageFactory messageFactory) {
021: super (MessageTransportState.STATE_RESTART,
022: handshakeErrorHandler, messageFactory, true, smtLogger);
023: this .connectionId = connectionID;
024: }
025:
026: /**
027: * Constructor for when you want a transport that you can specify a connection
028: * (e.g., in a server). This constructor will create an open MessageTransport
029: * ready for use.
030: */
031: public ServerMessageTransport(ConnectionID connectionId,
032: TCConnection conn,
033: TransportHandshakeErrorHandler handshakeErrorHandler,
034: TransportHandshakeMessageFactory messageFactory) {
035: super (MessageTransportState.STATE_START, handshakeErrorHandler,
036: messageFactory, true, smtLogger);
037: this .connectionId = connectionId;
038: Assert.assertNotNull(conn);
039: wireNewConnection(conn);
040: }
041:
042: protected ConnectionAttacher getConnectionAttacher() {
043: if (this .status.isRestart()) {
044: return new RestartConnectionAttacher();
045: } else
046: return super .getConnectionAttacher();
047: }
048:
049: public NetworkStackID open() {
050: throw new UnsupportedOperationException(
051: "Server transport doesn't support open()");
052: }
053:
054: protected void receiveTransportMessageImpl(
055: WireProtocolMessage message) {
056: synchronized (status) {
057: if (status.isStart()) {
058: verifyAndHandleAck(message);
059: message.recycle();
060: return;
061: } else if (verifyHandshakeMessage(message)) {
062: handleHandshakeError(
063: new TransportHandshakeErrorContext(
064: "Unexpected handshake message in state: "
065: + status),
066: (TransportHandshakeMessage) message);
067: return;
068: }
069: }
070: super .receiveToReceiveLayer(message);
071: }
072:
073: private void verifyAndHandleAck(WireProtocolMessage message) {
074: if (!verifyAck(message)) {
075: handleHandshakeError(new TransportHandshakeErrorContext(
076: "Expected an ACK message but received: " + message));
077: } else {
078: handleAck((TransportHandshakeMessage) message);
079: }
080: }
081:
082: private void handleAck(TransportHandshakeMessage ack) {
083: synchronized (status) {
084: Assert.eval(status.isStart());
085: Assert.eval("Wrong connection ID: [" + this .connectionId
086: + "] != [" + ack.getConnectionId() + "]",
087: this .connectionId.equals(ack.getConnectionId()));
088: status.established();
089: fireTransportConnectedEvent();
090: }
091: }
092:
093: private boolean verifyAck(WireProtocolMessage message) {
094: return message instanceof TransportHandshakeMessage
095: && ((TransportHandshakeMessage) message).isAck();
096: }
097:
098: private boolean verifyHandshakeMessage(WireProtocolMessage message) {
099: return message instanceof TransportHandshakeMessage;
100: }
101:
102: private final class RestartConnectionAttacher implements
103: ConnectionAttacher {
104:
105: public void attachNewConnection(TCConnectionEvent closeEvent,
106: TCConnection oldConnection, TCConnection newConnection) {
107: Assert.assertNull(oldConnection);
108: wireNewConnection(newConnection);
109: }
110:
111: }
112:
113: }
|