001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.transport;
006:
007: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
008:
009: import com.tc.logging.CustomerLogging;
010: import com.tc.logging.TCLogger;
011: import com.tc.net.core.TCConnection;
012: import com.tc.net.protocol.IllegalReconnectException;
013: import com.tc.net.protocol.NetworkStackHarness;
014: import com.tc.net.protocol.NetworkStackHarnessFactory;
015: import com.tc.net.protocol.ProtocolAdaptorFactory;
016: import com.tc.net.protocol.StackNotFoundException;
017: import com.tc.net.protocol.TCProtocolAdaptor;
018: import com.tc.net.protocol.tcm.ServerMessageChannelFactory;
019: import com.tc.util.Assert;
020:
021: import java.util.ArrayList;
022: import java.util.Iterator;
023: import java.util.List;
024: import java.util.Map;
025: import java.util.Set;
026:
027: /**
028: * Provides network stacks on the server side
029: */
030: public class ServerStackProvider implements NetworkStackProvider,
031: MessageTransportListener, ProtocolAdaptorFactory {
032:
033: private final Map harnesses = new ConcurrentHashMap();
034: private final NetworkStackHarnessFactory harnessFactory;
035: private final ServerMessageChannelFactory channelFactory;
036: private final TransportHandshakeMessageFactory handshakeMessageFactory;
037: private final ConnectionIDFactory connectionIdFactory;
038: private final ConnectionPolicy connectionPolicy;
039: private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
040: private final WireProtocolMessageSink wireProtoMsgsink;
041: private final MessageTransportFactory messageTransportFactory;
042: private final List transportListeners = new ArrayList(1);
043: private final TCLogger logger;
044: private final TCLogger consoleLogger = CustomerLogging
045: .getConsoleLogger();
046:
047: public ServerStackProvider(TCLogger logger,
048: Set initialConnectionIDs,
049: NetworkStackHarnessFactory harnessFactory,
050: ServerMessageChannelFactory channelFactory,
051: MessageTransportFactory messageTransportFactory,
052: TransportHandshakeMessageFactory handshakeMessageFactory,
053: ConnectionIDFactory connectionIdFactory,
054: ConnectionPolicy connectionPolicy,
055: WireProtocolAdaptorFactory wireProtocolAdaptorFactory) {
056: this (logger, initialConnectionIDs, harnessFactory,
057: channelFactory, messageTransportFactory,
058: handshakeMessageFactory, connectionIdFactory,
059: connectionPolicy, wireProtocolAdaptorFactory, null);
060: }
061:
062: public ServerStackProvider(TCLogger logger,
063: Set initialConnectionIDs,
064: NetworkStackHarnessFactory harnessFactory,
065: ServerMessageChannelFactory channelFactory,
066: MessageTransportFactory messageTransportFactory,
067: TransportHandshakeMessageFactory handshakeMessageFactory,
068: ConnectionIDFactory connectionIdFactory,
069: ConnectionPolicy connectionPolicy,
070: WireProtocolAdaptorFactory wireProtocolAdaptorFactory,
071: WireProtocolMessageSink wireProtoMsgSink) {
072: this .messageTransportFactory = messageTransportFactory;
073: this .connectionPolicy = connectionPolicy;
074: this .wireProtocolAdaptorFactory = wireProtocolAdaptorFactory;
075: this .wireProtoMsgsink = wireProtoMsgSink;
076: Assert.assertNotNull(harnessFactory);
077: this .harnessFactory = harnessFactory;
078: this .channelFactory = channelFactory;
079: this .handshakeMessageFactory = handshakeMessageFactory;
080: this .connectionIdFactory = connectionIdFactory;
081: this .transportListeners.add(this );
082: this .logger = logger;
083: for (Iterator i = initialConnectionIDs.iterator(); i.hasNext();) {
084: ConnectionID connectionID = (ConnectionID) i.next();
085: logger
086: .info("Preparing comms stack for previously connected client: "
087: + connectionID);
088: newStackHarness(connectionID,
089: messageTransportFactory
090: .createNewTransport(connectionID,
091: createHandshakeErrorHandler(),
092: handshakeMessageFactory,
093: transportListeners));
094: }
095: }
096:
097: public MessageTransport attachNewConnection(
098: ConnectionID connectionId, TCConnection connection)
099: throws StackNotFoundException, IllegalReconnectException {
100: Assert.assertNotNull(connection);
101:
102: final NetworkStackHarness harness;
103: final MessageTransport rv;
104: if (connectionId.isNull()) {
105: connectionId = connectionIdFactory.nextConnectionId();
106:
107: rv = messageTransportFactory.createNewTransport(
108: connectionId, connection,
109: createHandshakeErrorHandler(),
110: handshakeMessageFactory, transportListeners);
111: newStackHarness(connectionId, rv);
112: } else {
113: harness = (NetworkStackHarness) harnesses.get(connectionId);
114:
115: if (harness == null) {
116: throw new StackNotFoundException(connectionId);
117: } else {
118: rv = harness.attachNewConnection(connection);
119: }
120: }
121: return rv;
122: }
123:
124: private void newStackHarness(ConnectionID id,
125: MessageTransport transport) {
126: final NetworkStackHarness harness;
127: harness = harnessFactory.createServerHarness(channelFactory,
128: transport, new MessageTransportListener[] { this });
129: harness.finalizeStack();
130: Object previous = harnesses.put(id, harness);
131: if (previous != null) {
132: throw new AssertionError("previous is " + previous);
133: }
134: }
135:
136: private TransportHandshakeErrorHandler createHandshakeErrorHandler() {
137: return new TransportHandshakeErrorHandler() {
138:
139: public void handleHandshakeError(
140: TransportHandshakeErrorContext e) {
141: consoleLogger.info(e.getMessage());
142: logger.info(e.getMessage());
143: }
144:
145: public void handleHandshakeError(
146: TransportHandshakeErrorContext e,
147: TransportHandshakeMessage m) {
148: logger.info(e.getMessage());
149: }
150:
151: };
152: }
153:
154: NetworkStackHarness removeNetworkStack(ConnectionID connectionId) {
155: return (NetworkStackHarness) harnesses.remove(connectionId);
156: }
157:
158: /*********************************************************************************************************************
159: * MessageTransportListener methods.
160: */
161: public void notifyTransportConnected(MessageTransport transport) {
162: // don't care
163: }
164:
165: /**
166: * A client disconnected.
167: */
168: public void notifyTransportDisconnected(MessageTransport transport) {
169: // Currenly we dont care about this event here. In AbstractMessageChannel in the server, this event closes the
170: // channel
171: // so effectively a disconnected transport means a closed channel in the server. When we later implement clients
172: // reconnect
173: // this will change and this will trigger a reconnect window for that client here.
174: }
175:
176: private void close(ConnectionID connectionId) {
177: NetworkStackHarness harness = removeNetworkStack(connectionId);
178: if (harness == null) {
179: throw new AssertionError(
180: "Receive a transport closed event for a transport that isn't in the map :"
181: + connectionId);
182: }
183: }
184:
185: public void notifyTransportConnectAttempt(MessageTransport transport) {
186: // don't care
187: }
188:
189: /**
190: * The connection was closed. The client is never allowed to reconnect. Removes stack associated with the given
191: * transport from the map of managed stacks.
192: */
193: public void notifyTransportClosed(MessageTransport transport) {
194: close(transport.getConnectionId());
195: this .connectionPolicy.clientDisconnected();
196: }
197:
198: /*********************************************************************************************************************
199: * ProtocolAdaptorFactory interface
200: */
201:
202: public TCProtocolAdaptor getInstance() {
203: if (wireProtoMsgsink != null) {
204: return this .wireProtocolAdaptorFactory
205: .newWireProtocolAdaptor(wireProtoMsgsink);
206: } else {
207: MessageSink sink = new MessageSink(
208: createHandshakeErrorHandler());
209: return this .wireProtocolAdaptorFactory
210: .newWireProtocolAdaptor(sink);
211: }
212: }
213:
214: /*********************************************************************************************************************
215: * private stuff
216: */
217:
218: class MessageSink implements WireProtocolMessageSink {
219: private final TransportHandshakeErrorHandler handshakeErrorHandler;
220: private volatile boolean isSynReceived = false;
221: private volatile boolean isHandshakeError = false;
222: private volatile MessageTransport transport;
223:
224: private MessageSink(
225: TransportHandshakeErrorHandler handshakeErrorHandler) {
226: this .handshakeErrorHandler = handshakeErrorHandler;
227: }
228:
229: public void putMessage(WireProtocolMessage message) {
230: if (!isSynReceived) {
231: synchronized (this ) {
232: if (!isSynReceived) {
233: isSynReceived = true;
234: verifyAndHandleSyn(message);
235: message.recycle();
236: return;
237: }
238: }
239: }
240: if (!isHandshakeError) {
241: this .transport.receiveTransportMessage(message);
242: }
243: }
244:
245: private void verifyAndHandleSyn(WireProtocolMessage message) {
246: if (!verifySyn(message)) {
247: handleHandshakeError(new TransportHandshakeErrorContext(
248: "Expected a SYN message but received: "
249: + message));
250: } else {
251: try {
252: handleSyn((SynMessage) message);
253: } catch (StackNotFoundException e) {
254: handleHandshakeError(new TransportHandshakeErrorContext(
255: "Unable to find communications stack. "
256: + e.getMessage()
257: + ". This is usually caused by a client from a prior run trying to illegally reconnect to the server."
258: + " While that client is being rejected, everything else should proceed as normal. ",
259: e));
260: }
261: }
262: }
263:
264: private void handleHandshakeError(
265: TransportHandshakeErrorContext ctxt) {
266: this .isHandshakeError = true;
267: this .handshakeErrorHandler.handleHandshakeError(ctxt);
268: }
269:
270: private void handleSyn(SynMessage syn)
271: throws StackNotFoundException {
272: ConnectionID connectionId = syn.getConnectionId();
273:
274: if (connectionId == null) {
275: sendSynAck(connectionId,
276: new TransportHandshakeErrorContext(
277: "Invalid connection id: "
278: + connectionId), syn
279: .getSource());
280: this .isHandshakeError = true;
281: return;
282: }
283:
284: try {
285: this .transport = attachNewConnection(connectionId, syn
286: .getSource());
287: } catch (IllegalReconnectException e) {
288: logger
289: .warn("Client attempting an illegal reconnect for id "
290: + connectionId + ", " + syn.getSource());
291: return;
292: }
293: connectionId = this .transport.getConnectionId();
294: sendSynAck(connectionId, syn.getSource());
295: }
296:
297: private boolean verifySyn(WireProtocolMessage message) {
298: return message instanceof TransportHandshakeMessage
299: && ((TransportHandshakeMessage) message).isSyn();
300: }
301:
302: private void sendSynAck(ConnectionID connectionId,
303: TCConnection source) {
304: sendSynAck(connectionId, null, source);
305: }
306:
307: private void sendSynAck(ConnectionID connectionId,
308: TransportHandshakeErrorContext errorContext,
309: TCConnection source) {
310: TransportHandshakeMessage synAck;
311: boolean isError = (errorContext != null);
312: int maxConnections = connectionPolicy.getMaxConnections();
313: connectionPolicy.clientConnected();
314: // NOTE: There's a race here which should be ok, since it doesn't matter which client gets told there are
315: // no more connections left...
316: boolean isMaxConnectionsExceeded = connectionPolicy
317: .maxConnectionsExceeded();
318: if (isError) {
319: synAck = handshakeMessageFactory.createSynAck(
320: connectionId, errorContext, source,
321: isMaxConnectionsExceeded, maxConnections);
322: } else {
323: synAck = handshakeMessageFactory.createSynAck(
324: connectionId, source, isMaxConnectionsExceeded,
325: maxConnections);
326: }
327: sendMessage(synAck);
328: }
329:
330: private void sendMessage(WireProtocolMessage message) {
331: transport.sendToConnection(message);
332: }
333: }
334:
335: }
|