001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.tcm;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.async.impl.NullSink;
009: import com.tc.exception.TCRuntimeException;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.net.TCSocketAddress;
013: import com.tc.net.core.ConnectionAddressProvider;
014: import com.tc.net.core.Constants;
015: import com.tc.net.core.TCConnection;
016: import com.tc.net.core.TCConnectionManager;
017: import com.tc.net.core.TCConnectionManagerJDK14;
018: import com.tc.net.core.TCListener;
019: import com.tc.net.protocol.NetworkStackHarness;
020: import com.tc.net.protocol.NetworkStackHarnessFactory;
021: import com.tc.net.protocol.transport.ClientConnectionEstablisher;
022: import com.tc.net.protocol.transport.ClientMessageTransport;
023: import com.tc.net.protocol.transport.ConnectionID;
024: import com.tc.net.protocol.transport.ConnectionIDFactory;
025: import com.tc.net.protocol.transport.ConnectionPolicy;
026: import com.tc.net.protocol.transport.MessageTransport;
027: import com.tc.net.protocol.transport.MessageTransportFactory;
028: import com.tc.net.protocol.transport.MessageTransportListener;
029: import com.tc.net.protocol.transport.ServerMessageTransport;
030: import com.tc.net.protocol.transport.ServerStackProvider;
031: import com.tc.net.protocol.transport.TransportHandshakeErrorContext;
032: import com.tc.net.protocol.transport.TransportHandshakeErrorHandler;
033: import com.tc.net.protocol.transport.TransportHandshakeMessage;
034: import com.tc.net.protocol.transport.TransportHandshakeMessageFactory;
035: import com.tc.net.protocol.transport.TransportHandshakeMessageFactoryImpl;
036: import com.tc.net.protocol.transport.WireProtocolAdaptorFactoryImpl;
037: import com.tc.net.protocol.transport.WireProtocolMessageSink;
038: import com.tc.object.session.SessionProvider;
039: import com.tc.util.concurrent.SetOnceFlag;
040:
041: import java.io.IOException;
042: import java.util.HashSet;
043: import java.util.List;
044: import java.util.Set;
045:
046: /**
047: * Communications manager for setting up listners and creating client connections
048: *
049: * @author teck
050: */
051: public class CommunicationsManagerImpl implements CommunicationsManager {
052: private static final TCLogger logger = TCLogging
053: .getLogger(CommunicationsManager.class);
054:
055: private final SetOnceFlag shutdown = new SetOnceFlag();
056: private final Set listeners = new HashSet();
057: private final TCConnectionManager connectionManager;
058: private final boolean privateConnMgr;
059: private final NetworkStackHarnessFactory stackHarnessFactory;
060: private final TransportHandshakeMessageFactory transportHandshakeMessageFactory;
061: private final MessageMonitor monitor;
062: private final ConnectionPolicy connectionPolicy;
063:
064: /**
065: * Create a communications manager. This implies that one or more network handling threads will be started on your
066: * behalf. As such, you should not be instantiating one of these per connection for instance.
067: */
068: public CommunicationsManagerImpl(MessageMonitor monitor,
069: NetworkStackHarnessFactory stackHarnessFactory,
070: ConnectionPolicy connectionPolicy) {
071: this (monitor, stackHarnessFactory, null, connectionPolicy);
072: }
073:
074: /**
075: * Create a comms manager with the given connection manager. This cstr is mostly for testing, or in the event that you
076: * actually want to use an explicit connection manager
077: *
078: * @param connMgr the connection manager to use
079: * @param serverDescriptors
080: */
081: public CommunicationsManagerImpl(MessageMonitor monitor,
082: NetworkStackHarnessFactory stackHarnessFactory,
083: TCConnectionManager connMgr,
084: ConnectionPolicy connectionPolicy) {
085:
086: this .monitor = monitor;
087: this .transportHandshakeMessageFactory = new TransportHandshakeMessageFactoryImpl();
088: this .connectionPolicy = connectionPolicy;
089: this .stackHarnessFactory = stackHarnessFactory;
090: privateConnMgr = (connMgr == null);
091:
092: if (null == connMgr) {
093: this .connectionManager = new TCConnectionManagerJDK14();
094: } else {
095: this .connectionManager = connMgr;
096: }
097: }
098:
099: public TCConnectionManager getConnectionManager() {
100: return this .connectionManager;
101: }
102:
103: public boolean isInShutdown() {
104: return shutdown.isSet();
105: }
106:
107: public void shutdown() {
108: if (shutdown.attemptSet()) {
109: if (privateConnMgr) {
110: connectionManager.shutdown();
111: }
112: } else {
113: logger.warn("shutdown already started");
114: }
115: }
116:
117: public NetworkListener[] getAllListeners() {
118: synchronized (listeners) {
119: return (NetworkListener[]) listeners
120: .toArray(new NetworkListener[listeners.size()]);
121: }
122: }
123:
124: public ClientMessageChannel createClientChannel(
125: final SessionProvider sessionProvider,
126: final int maxReconnectTries, String hostname, int port,
127: final int timeout, ConnectionAddressProvider addressProvider) {
128: // XXX: maxReconnectTries MUST be non-zero if we have a
129: // once and only once protocol stack.
130:
131: final ConnectionAddressProvider provider = addressProvider;
132:
133: ClientMessageChannelImpl rv = new ClientMessageChannelImpl(
134: new TCMessageFactoryImpl(sessionProvider, monitor),
135: new TCMessageRouterImpl(), sessionProvider);
136:
137: MessageTransportFactory transportFactory = new MessageTransportFactory() {
138:
139: public MessageTransport createNewTransport() {
140: TransportHandshakeErrorHandler handshakeErrorHandler = new TransportHandshakeErrorHandler() {
141:
142: public void handleHandshakeError(
143: TransportHandshakeErrorContext e) {
144: System.err.println(e);
145: new TCRuntimeException(
146: "I'm crashing the client!")
147: .printStackTrace();
148: try {
149: Thread.sleep(30 * 1000);
150: } catch (InterruptedException e1) {
151: e1.printStackTrace();
152: }
153: System.exit(1);
154: }
155:
156: public void handleHandshakeError(
157: TransportHandshakeErrorContext e,
158: TransportHandshakeMessage m) {
159: System.err.println(e);
160: System.err.println(m);
161: new TCRuntimeException(
162: "I'm crashing the client")
163: .printStackTrace();
164: try {
165: Thread.sleep(30 * 1000);
166: } catch (InterruptedException e1) {
167: e1.printStackTrace();
168: }
169: System.exit(1);
170: }
171:
172: };
173:
174: ClientConnectionEstablisher clientConnectionEstablisher = new ClientConnectionEstablisher(
175: connectionManager, provider, maxReconnectTries,
176: timeout);
177: ClientMessageTransport cmt = new ClientMessageTransport(
178: clientConnectionEstablisher,
179: handshakeErrorHandler,
180: transportHandshakeMessageFactory,
181: new WireProtocolAdaptorFactoryImpl());
182: return cmt;
183: }
184:
185: public MessageTransport createNewTransport(
186: ConnectionID connectionID,
187: TransportHandshakeErrorHandler handler,
188: TransportHandshakeMessageFactory handshakeMessageFactory,
189: List transportListeners) {
190: throw new AssertionError();
191: }
192:
193: public MessageTransport createNewTransport(
194: ConnectionID connectionId,
195: TCConnection connection,
196: TransportHandshakeErrorHandler handler,
197: TransportHandshakeMessageFactory handshakeMessageFactory,
198: List transportListeners) {
199: throw new AssertionError();
200: }
201:
202: };
203: NetworkStackHarness stackHarness = this .stackHarnessFactory
204: .createClientHarness(transportFactory, rv,
205: new MessageTransportListener[0]);
206:
207: stackHarness.finalizeStack();
208:
209: return rv;
210: }
211:
212: /**
213: * Creates a network listener with a default network stack.
214: */
215: public NetworkListener createListener(
216: SessionProvider sessionProvider, TCSocketAddress addr,
217: boolean transportDisconnectRemovesChannel,
218: ConnectionIDFactory connectionIdFactory) {
219: return createListener(sessionProvider, addr,
220: transportDisconnectRemovesChannel, connectionIdFactory,
221: true);
222: }
223:
224: public NetworkListener createListener(
225: SessionProvider sessionProvider, TCSocketAddress address,
226: boolean transportDisconnectRemovesChannel,
227: ConnectionIDFactory connectionIDFactory, Sink httpSink) {
228: return createListener(sessionProvider, address,
229: transportDisconnectRemovesChannel, connectionIDFactory,
230: true, httpSink, null);
231: }
232:
233: public NetworkListener createListener(
234: SessionProvider sessionProvider, TCSocketAddress addr,
235: boolean transportDisconnectRemovesChannel,
236: ConnectionIDFactory connectionIdFactory, boolean reuseAddr) {
237: return createListener(sessionProvider, addr,
238: transportDisconnectRemovesChannel, connectionIdFactory,
239: reuseAddr, new NullSink(), null);
240: }
241:
242: public NetworkListener createListener(
243: SessionProvider sessionProvider, TCSocketAddress addr,
244: boolean transportDisconnectRemovesChannel,
245: ConnectionIDFactory connectionIdFactory,
246: WireProtocolMessageSink wireProtoMsgSnk) {
247: return createListener(sessionProvider, addr,
248: transportDisconnectRemovesChannel, connectionIdFactory,
249: true, new NullSink(), wireProtoMsgSnk);
250: }
251:
252: /**
253: * Creates a network listener with a default network stack.
254: */
255: private NetworkListener createListener(
256: SessionProvider sessionProvider, TCSocketAddress addr,
257: boolean transportDisconnectRemovesChannel,
258: ConnectionIDFactory connectionIdFactory, boolean reuseAddr,
259: Sink httpSink, WireProtocolMessageSink wireProtoMsgSnk) {
260: if (shutdown.isSet()) {
261: throw new IllegalStateException("Comms manger shut down");
262: }
263:
264: // The idea here is that someday we might want to pass in a custom channel factory. The reason you might want to do
265: // that is so thay you can control the actual class of the channels created off this listener
266: final TCMessageRouter msgRouter = new TCMessageRouterImpl();
267: final TCMessageFactory msgFactory = new TCMessageFactoryImpl(
268: sessionProvider, monitor);
269: final ServerMessageChannelFactory channelFactory = new ServerMessageChannelFactory() {
270: public MessageChannelInternal createNewChannel(ChannelID id) {
271: return new ServerMessageChannelImpl(id, msgRouter,
272: msgFactory);
273: }
274: };
275:
276: final ChannelManagerImpl channelManager = new ChannelManagerImpl(
277: transportDisconnectRemovesChannel, channelFactory);
278:
279: return new NetworkListenerImpl(addr, this , channelManager,
280: msgFactory, msgRouter, reuseAddr, connectionIdFactory,
281: httpSink, wireProtoMsgSnk);
282: }
283:
284: TCListener createCommsListener(TCSocketAddress addr,
285: final ServerMessageChannelFactory channelFactory,
286: boolean resueAddr, Set initialConnectionIDs,
287: ConnectionIDFactory connectionIdFactory, Sink httpSink,
288: WireProtocolMessageSink wireProtocolMessageSink)
289: throws IOException {
290:
291: MessageTransportFactory transportFactory = new MessageTransportFactory() {
292:
293: public MessageTransport createNewTransport() {
294: throw new AssertionError();
295: }
296:
297: public MessageTransport createNewTransport(
298: ConnectionID connectionID,
299: TransportHandshakeErrorHandler handler,
300: TransportHandshakeMessageFactory handshakeMessageFactory,
301: List transportListeners) {
302: MessageTransport rv = new ServerMessageTransport(
303: connectionID, handler, handshakeMessageFactory);
304: rv.addTransportListeners(transportListeners);
305: return rv;
306: }
307:
308: public MessageTransport createNewTransport(
309: ConnectionID connectionId,
310: TCConnection connection,
311: TransportHandshakeErrorHandler handler,
312: TransportHandshakeMessageFactory handshakeMessageFactory,
313: List transportListeners) {
314: MessageTransport rv = new ServerMessageTransport(
315: connectionId, connection, handler,
316: handshakeMessageFactory);
317: rv.addTransportListeners(transportListeners);
318: return rv;
319: }
320:
321: };
322:
323: ServerStackProvider stackProvider = new ServerStackProvider(
324: TCLogging.getLogger(ServerStackProvider.class),
325: initialConnectionIDs, stackHarnessFactory,
326: channelFactory, transportFactory,
327: this .transportHandshakeMessageFactory,
328: connectionIdFactory, this .connectionPolicy,
329: new WireProtocolAdaptorFactoryImpl(httpSink),
330: wireProtocolMessageSink);
331: return connectionManager.createListener(addr, stackProvider,
332: Constants.DEFAULT_ACCEPT_QUEUE_DEPTH, resueAddr);
333: }
334:
335: void registerListener(NetworkListener lsnr) {
336: synchronized (listeners) {
337: boolean added = listeners.add(lsnr);
338:
339: if (!added) {
340: logger
341: .warn("replaced an existing listener in the listener map");
342: }
343: }
344: }
345:
346: void unregisterListener(NetworkListener lsnr) {
347: synchronized (listeners) {
348: listeners.remove(lsnr);
349: }
350: }
351:
352: }
|