001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object;
006:
007: import com.tc.async.api.SEDA;
008: import com.tc.async.api.Sink;
009: import com.tc.async.api.Stage;
010: import com.tc.async.api.StageManager;
011: import com.tc.cluster.Cluster;
012: import com.tc.config.schema.dynamic.ConfigItem;
013: import com.tc.lang.TCThreadGroup;
014: import com.tc.logging.ChannelIDLogger;
015: import com.tc.logging.ChannelIDLoggerProvider;
016: import com.tc.logging.CustomerLogging;
017: import com.tc.logging.TCLogger;
018: import com.tc.logging.TCLogging;
019: import com.tc.management.ClientLockStatManager;
020: import com.tc.management.ClientLockStatManagerImpl;
021: import com.tc.management.L1Management;
022: import com.tc.management.beans.sessions.SessionMonitorMBean;
023: import com.tc.management.remote.protocol.terracotta.JmxRemoteTunnelMessage;
024: import com.tc.management.remote.protocol.terracotta.L1JmxReady;
025: import com.tc.management.remote.protocol.terracotta.TunnelingEventHandler;
026: import com.tc.net.MaxConnectionsExceededException;
027: import com.tc.net.core.ConnectionAddressProvider;
028: import com.tc.net.core.ConnectionInfo;
029: import com.tc.net.protocol.NetworkStackHarnessFactory;
030: import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
031: import com.tc.net.protocol.delivery.OOOEventHandler;
032: import com.tc.net.protocol.delivery.OOONetworkStackHarnessFactory;
033: import com.tc.net.protocol.delivery.OnceAndOnlyOnceProtocolNetworkLayerFactoryImpl;
034: import com.tc.net.protocol.tcm.CommunicationsManager;
035: import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
036: import com.tc.net.protocol.tcm.HydrateHandler;
037: import com.tc.net.protocol.tcm.NullMessageMonitor;
038: import com.tc.net.protocol.tcm.TCMessageType;
039: import com.tc.net.protocol.transport.NullConnectionPolicy;
040: import com.tc.object.bytecode.Manager;
041: import com.tc.object.bytecode.hook.impl.PreparedComponentsFromL2Connection;
042: import com.tc.object.cache.CacheConfigImpl;
043: import com.tc.object.cache.CacheManager;
044: import com.tc.object.cache.ClockEvictionPolicy;
045: import com.tc.object.config.DSOClientConfigHelper;
046: import com.tc.object.dna.impl.DNAEncodingImpl;
047: import com.tc.object.event.DmiManager;
048: import com.tc.object.event.DmiManagerImpl;
049: import com.tc.object.field.TCFieldFactory;
050: import com.tc.object.gtx.ClientGlobalTransactionManager;
051: import com.tc.object.gtx.ClientGlobalTransactionManagerImpl;
052: import com.tc.object.handler.BatchTransactionAckHandler;
053: import com.tc.object.handler.ClientCoordinationHandler;
054: import com.tc.object.handler.DmiHandler;
055: import com.tc.object.handler.LockResponseHandler;
056: import com.tc.object.handler.LockStatisticsResponseHandler;
057: import com.tc.object.handler.ReceiveObjectHandler;
058: import com.tc.object.handler.ReceiveRootIDHandler;
059: import com.tc.object.handler.ReceiveTransactionCompleteHandler;
060: import com.tc.object.handler.ReceiveTransactionHandler;
061: import com.tc.object.handshakemanager.ClientHandshakeManager;
062: import com.tc.object.idprovider.api.ObjectIDProvider;
063: import com.tc.object.idprovider.impl.ObjectIDProviderImpl;
064: import com.tc.object.idprovider.impl.RemoteObjectIDBatchSequenceProvider;
065: import com.tc.object.loaders.ClassProvider;
066: import com.tc.object.lockmanager.api.ClientLockManager;
067: import com.tc.object.lockmanager.impl.ClientLockManagerImpl;
068: import com.tc.object.lockmanager.impl.RemoteLockManagerImpl;
069: import com.tc.object.lockmanager.impl.ThreadLockManagerImpl;
070: import com.tc.object.logging.RuntimeLogger;
071: import com.tc.object.logging.RuntimeLoggerImpl;
072: import com.tc.object.msg.AcknowledgeTransactionMessageImpl;
073: import com.tc.object.msg.BatchTransactionAcknowledgeMessageImpl;
074: import com.tc.object.msg.BroadcastTransactionMessageImpl;
075: import com.tc.object.msg.ClientHandshakeAckMessageImpl;
076: import com.tc.object.msg.ClientHandshakeMessageImpl;
077: import com.tc.object.msg.ClusterMembershipMessage;
078: import com.tc.object.msg.CommitTransactionMessageImpl;
079: import com.tc.object.msg.CompletedTransactionLowWaterMarkMessage;
080: import com.tc.object.msg.JMXMessage;
081: import com.tc.object.msg.LockRequestMessage;
082: import com.tc.object.msg.LockResponseMessage;
083: import com.tc.object.msg.LockStatisticsResponseMessage;
084: import com.tc.object.msg.ObjectIDBatchRequestMessage;
085: import com.tc.object.msg.ObjectIDBatchRequestResponseMessage;
086: import com.tc.object.msg.ObjectsNotFoundMessage;
087: import com.tc.object.msg.RequestManagedObjectMessageImpl;
088: import com.tc.object.msg.RequestManagedObjectResponseMessage;
089: import com.tc.object.msg.RequestRootMessageImpl;
090: import com.tc.object.msg.RequestRootResponseMessage;
091: import com.tc.object.net.DSOClientMessageChannel;
092: import com.tc.object.session.SessionManager;
093: import com.tc.object.session.SessionManagerImpl;
094: import com.tc.object.session.SessionProvider;
095: import com.tc.object.tx.ClientTransactionFactory;
096: import com.tc.object.tx.ClientTransactionFactoryImpl;
097: import com.tc.object.tx.ClientTransactionManager;
098: import com.tc.object.tx.ClientTransactionManagerImpl;
099: import com.tc.object.tx.LockAccounting;
100: import com.tc.object.tx.RemoteTransactionManager;
101: import com.tc.object.tx.RemoteTransactionManagerImpl;
102: import com.tc.object.tx.TransactionBatchAccounting;
103: import com.tc.object.tx.TransactionBatchFactory;
104: import com.tc.object.tx.TransactionBatchWriterFactory;
105: import com.tc.properties.TCProperties;
106: import com.tc.properties.TCPropertiesImpl;
107: import com.tc.util.Assert;
108: import com.tc.util.ProductInfo;
109: import com.tc.util.TCTimeoutException;
110: import com.tc.util.concurrent.ThreadUtil;
111: import com.tc.util.sequence.BatchSequence;
112: import com.tc.util.sequence.Sequence;
113: import com.tc.util.sequence.SimpleSequence;
114:
115: import java.io.IOException;
116: import java.net.ConnectException;
117: import java.util.Collection;
118: import java.util.Collections;
119:
120: /**
121: * This is the main point of entry into the DSO client.
122: */
123: public class DistributedObjectClient extends SEDA {
124:
125: private static final TCLogger logger = CustomerLogging
126: .getDSOGenericLogger();
127: private static final TCLogger consoleLogger = CustomerLogging
128: .getConsoleLogger();
129:
130: private final DSOClientConfigHelper config;
131: private final ClassProvider classProvider;
132: private final PreparedComponentsFromL2Connection connectionComponents;
133: private final Manager manager;
134: private final Cluster cluster;
135:
136: private DSOClientMessageChannel channel;
137: private ClientLockManager lockManager;
138: private ClientObjectManagerImpl objectManager;
139: private ClientTransactionManager txManager;
140: private CommunicationsManager communicationsManager;
141: private RemoteTransactionManager rtxManager;
142: private PauseListener pauseListener;
143: private ClientHandshakeManager clientHandshakeManager;
144: private RuntimeLogger runtimeLogger;
145: private CacheManager cacheManager;
146: private L1Management l1Management;
147: private TCProperties l1Properties;
148: private DmiManager dmiManager;
149:
150: public DistributedObjectClient(DSOClientConfigHelper config,
151: TCThreadGroup threadGroup, ClassProvider classProvider,
152: PreparedComponentsFromL2Connection connectionComponents,
153: Manager manager, Cluster cluster) {
154: super (threadGroup);
155: Assert.assertNotNull(config);
156: this .config = config;
157: this .classProvider = classProvider;
158: this .connectionComponents = connectionComponents;
159: this .pauseListener = new NullPauseListener();
160: this .manager = manager;
161: this .cluster = cluster;
162: }
163:
164: public void setPauseListener(PauseListener pauseListener) {
165: this .pauseListener = pauseListener;
166: }
167:
168: public void start() {
169: l1Properties = TCPropertiesImpl.getProperties()
170: .getPropertiesFor("l1");
171: int maxSize = 50000;
172: int faultCount = config.getFaultCount();
173:
174: final Sequence sessionSequence = new SimpleSequence();
175: final SessionManager sessionManager = new SessionManagerImpl(
176: sessionSequence);
177: final SessionProvider sessionProvider = (SessionProvider) sessionManager;
178:
179: StageManager stageManager = getStageManager();
180:
181: // stageManager.turnTracingOn();
182:
183: // //////////////////////////////////
184: // create NetworkStackHarnessFactory
185: final boolean useOOOLayer = l1Properties
186: .getBoolean("reconnect.enabled");
187: final NetworkStackHarnessFactory networkStackHarnessFactory;
188: if (useOOOLayer) {
189: final Stage oooStage = stageManager.createStage(
190: "OOONetStage", new OOOEventHandler(), 1, maxSize);
191: networkStackHarnessFactory = new OOONetworkStackHarnessFactory(
192: new OnceAndOnlyOnceProtocolNetworkLayerFactoryImpl(),
193: oooStage.getSink());
194: } else {
195: networkStackHarnessFactory = new PlainNetworkStackHarnessFactory();
196: }
197: // //////////////////////////////////
198:
199: communicationsManager = new CommunicationsManagerImpl(
200: new NullMessageMonitor(), networkStackHarnessFactory,
201: new NullConnectionPolicy());
202:
203: logger.debug("Created CommunicationsManager.");
204:
205: ConfigItem connectionInfoItem = this .connectionComponents
206: .createConnectionInfoConfigItem();
207: ConnectionInfo[] connectionInfo = (ConnectionInfo[]) connectionInfoItem
208: .getObject();
209: ConnectionAddressProvider addrProvider = new ConnectionAddressProvider(
210: connectionInfo);
211:
212: String serverHost = connectionInfo[0].getHostname();
213: int serverPort = connectionInfo[0].getPort();
214:
215: channel = new DSOClientMessageChannelImpl(communicationsManager
216: .createClientChannel(sessionProvider, -1, serverHost,
217: serverPort, 10000, addrProvider));
218: ChannelIDLoggerProvider cidLoggerProvider = new ChannelIDLoggerProvider(
219: channel.getChannelIDProvider());
220: stageManager.setLoggerProvider(cidLoggerProvider);
221:
222: ClientIDProvider clientIDProvider = new ClientIDProviderImpl(
223: channel.getChannelIDProvider());
224:
225: this .runtimeLogger = new RuntimeLoggerImpl(config);
226:
227: logger.debug("Created channel.");
228:
229: ClientTransactionFactory txFactory = new ClientTransactionFactoryImpl(
230: runtimeLogger);
231:
232: TransactionBatchFactory txBatchFactory = new TransactionBatchWriterFactory(
233: channel.getCommitTransactionMessageFactory(),
234: new DNAEncodingImpl(classProvider));
235:
236: rtxManager = new RemoteTransactionManagerImpl(
237: new ChannelIDLogger(
238: channel.getChannelIDProvider(),
239: TCLogging
240: .getLogger(RemoteTransactionManagerImpl.class)),
241: txBatchFactory, new TransactionBatchAccounting(),
242: new LockAccounting(), sessionManager, channel);
243:
244: ClientGlobalTransactionManager gtxManager = new ClientGlobalTransactionManagerImpl(
245: rtxManager);
246:
247: ClientLockStatManager lockStatManager = new ClientLockStatManagerImpl();
248:
249: lockManager = new ClientLockManagerImpl(new ChannelIDLogger(
250: channel.getChannelIDProvider(), TCLogging
251: .getLogger(ClientLockManager.class)),
252: new RemoteLockManagerImpl(channel
253: .getLockRequestMessageFactory(), gtxManager),
254: sessionManager, lockStatManager);
255:
256: RemoteObjectManager remoteObjectManager = new RemoteObjectManagerImpl(
257: new ChannelIDLogger(channel.getChannelIDProvider(),
258: TCLogging.getLogger(RemoteObjectManager.class)),
259: clientIDProvider, channel
260: .getRequestRootMessageFactory(), channel
261: .getRequestManagedObjectMessageFactory(),
262: new NullObjectRequestMonitor(), faultCount,
263: sessionManager);
264:
265: RemoteObjectIDBatchSequenceProvider remoteIDProvider = new RemoteObjectIDBatchSequenceProvider(
266: channel.getObjectIDBatchRequestMessageFactory());
267: BatchSequence sequence = new BatchSequence(remoteIDProvider,
268: 50000);
269: ObjectIDProvider idProvider = new ObjectIDProviderImpl(sequence);
270:
271: TCClassFactory classFactory = new TCClassFactoryImpl(
272: new TCFieldFactory(config), config, classProvider);
273: TCObjectFactory objectFactory = new TCObjectFactoryImpl(
274: classFactory);
275:
276: objectManager = new ClientObjectManagerImpl(
277: remoteObjectManager, config, idProvider,
278: new ClockEvictionPolicy(-1), runtimeLogger, channel
279: .getChannelIDProvider(), classProvider,
280: classFactory, objectFactory, config.getPortability(),
281: channel);
282:
283: TCProperties cacheManagerProperties = l1Properties
284: .getPropertiesFor("cachemanager");
285: if (cacheManagerProperties.getBoolean("enabled")) {
286: this .cacheManager = new CacheManager(objectManager,
287: new CacheConfigImpl(cacheManagerProperties),
288: getThreadGroup());
289: if (logger.isDebugEnabled()) {
290: logger.debug("CacheManager Enabled : " + cacheManager);
291: }
292: } else {
293: logger.warn("CacheManager is Disabled");
294: }
295:
296: // Set up the JMX management stuff
297: final TunnelingEventHandler teh = new TunnelingEventHandler(
298: channel.channel());
299: l1Management = new L1Management(teh);
300: l1Management.start();
301:
302: txManager = new ClientTransactionManagerImpl(channel
303: .getChannelIDProvider(), objectManager,
304: new ThreadLockManagerImpl(lockManager), txFactory,
305: rtxManager, runtimeLogger, l1Management
306: .findClientTxMonitorMBean());
307:
308: Stage lockResponse = stageManager.createStage(
309: ClientConfigurationContext.LOCK_RESPONSE_STAGE,
310: new LockResponseHandler(sessionManager), 1, maxSize);
311: Stage receiveRootID = stageManager.createStage(
312: ClientConfigurationContext.RECEIVE_ROOT_ID_STAGE,
313: new ReceiveRootIDHandler(), 1, maxSize);
314: Stage receiveObject = stageManager.createStage(
315: ClientConfigurationContext.RECEIVE_OBJECT_STAGE,
316: new ReceiveObjectHandler(), 1, maxSize);
317: this .dmiManager = new DmiManagerImpl(classProvider,
318: objectManager, runtimeLogger);
319: Stage dmiStage = stageManager.createStage(
320: ClientConfigurationContext.DMI_STAGE, new DmiHandler(
321: dmiManager), 1, maxSize);
322:
323: Stage receiveTransaction = stageManager.createStage(
324: ClientConfigurationContext.RECEIVE_TRANSACTION_STAGE,
325: new ReceiveTransactionHandler(channel
326: .getChannelIDProvider(), channel
327: .getAcknowledgeTransactionMessageFactory(),
328: gtxManager, sessionManager, dmiStage.getSink(),
329: dmiManager), 1, maxSize);
330: Stage oidRequestResponse = stageManager
331: .createStage(
332: ClientConfigurationContext.OBJECT_ID_REQUEST_RESPONSE_STAGE,
333: remoteIDProvider, 1, maxSize);
334: Stage transactionResponse = stageManager
335: .createStage(
336: ClientConfigurationContext.RECEIVE_TRANSACTION_COMPLETE_STAGE,
337: new ReceiveTransactionCompleteHandler(), 1,
338: maxSize);
339: Stage hydrateStage = stageManager.createStage(
340: ClientConfigurationContext.HYDRATE_MESSAGE_STAGE,
341: new HydrateHandler(), 1, maxSize);
342: Stage batchTxnAckStage = stageManager.createStage(
343: ClientConfigurationContext.BATCH_TXN_ACK_STAGE,
344: new BatchTransactionAckHandler(), 1, maxSize);
345:
346: // By design this stage needs to be single threaded. If it wasn't then cluster memebership messages could get
347: // processed before the client handshake ack, and this client would get a faulty view of the cluster at best, or
348: // more likely an AssertionError
349: Stage pauseStage = stageManager.createStage(
350: ClientConfigurationContext.CLIENT_COORDINATION_STAGE,
351: new ClientCoordinationHandler(cluster), 1, maxSize);
352:
353: Stage lockStatisticsStage = stageManager
354: .createStage(
355: ClientConfigurationContext.LOCK_STATISTICS_RESPONSE_STAGE,
356: new LockStatisticsResponseHandler(), 1, 1);
357: lockStatManager.start(channel, lockStatisticsStage.getSink());
358:
359: final Stage jmxRemoteTunnelStage = stageManager.createStage(
360: ClientConfigurationContext.JMXREMOTE_TUNNEL_STAGE, teh,
361: 1, maxSize);
362:
363: // This set is designed to give the handshake manager an opportunity to pause stages when it is pausing due to
364: // disconnect. Unfortunately, the lock response stage can block, which I didn't realize at the time, so it's not
365: // being used.
366: Collection stagesToPauseOnDisconnect = Collections.EMPTY_LIST;
367: ProductInfo pInfo = ProductInfo.getInstance();
368: clientHandshakeManager = new ClientHandshakeManager(
369: new ChannelIDLogger(
370: channel.getChannelIDProvider(),
371: TCLogging
372: .getLogger(ClientHandshakeManager.class)),
373: clientIDProvider, channel
374: .getClientHandshakeMessageFactory(),
375: objectManager, remoteObjectManager, lockManager,
376: rtxManager, gtxManager, stagesToPauseOnDisconnect,
377: pauseStage.getSink(), sessionManager, pauseListener,
378: sequence, cluster, pInfo.buildVersion());
379: channel.addListener(clientHandshakeManager);
380:
381: ClientConfigurationContext cc = new ClientConfigurationContext(
382: stageManager, lockManager, remoteObjectManager,
383: txManager, clientHandshakeManager);
384: stageManager.startAll(cc);
385:
386: channel.addClassMapping(
387: TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE,
388: BatchTransactionAcknowledgeMessageImpl.class);
389: channel.addClassMapping(TCMessageType.REQUEST_ROOT_MESSAGE,
390: RequestRootMessageImpl.class);
391: channel.addClassMapping(TCMessageType.LOCK_REQUEST_MESSAGE,
392: LockRequestMessage.class);
393: channel.addClassMapping(TCMessageType.LOCK_RESPONSE_MESSAGE,
394: LockResponseMessage.class);
395: channel.addClassMapping(TCMessageType.LOCK_RECALL_MESSAGE,
396: LockResponseMessage.class);
397: channel.addClassMapping(
398: TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE,
399: LockResponseMessage.class);
400: channel.addClassMapping(TCMessageType.LOCK_STAT_MESSAGE,
401: LockResponseMessage.class);
402: channel.addClassMapping(
403: TCMessageType.LOCK_STATISTICS_RESPONSE_MESSAGE,
404: LockStatisticsResponseMessage.class);
405: channel.addClassMapping(
406: TCMessageType.COMMIT_TRANSACTION_MESSAGE,
407: CommitTransactionMessageImpl.class);
408: channel.addClassMapping(
409: TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE,
410: RequestRootResponseMessage.class);
411: channel.addClassMapping(
412: TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE,
413: RequestManagedObjectMessageImpl.class);
414: channel.addClassMapping(
415: TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE,
416: RequestManagedObjectResponseMessage.class);
417: channel.addClassMapping(
418: TCMessageType.OBJECTS_NOT_FOUND_RESPONSE_MESSAGE,
419: ObjectsNotFoundMessage.class);
420: channel.addClassMapping(
421: TCMessageType.BROADCAST_TRANSACTION_MESSAGE,
422: BroadcastTransactionMessageImpl.class);
423: channel.addClassMapping(
424: TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE,
425: ObjectIDBatchRequestMessage.class);
426: channel.addClassMapping(
427: TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE,
428: ObjectIDBatchRequestResponseMessage.class);
429: channel.addClassMapping(
430: TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE,
431: AcknowledgeTransactionMessageImpl.class);
432: channel.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_MESSAGE,
433: ClientHandshakeMessageImpl.class);
434: channel.addClassMapping(
435: TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE,
436: ClientHandshakeAckMessageImpl.class);
437: channel.addClassMapping(TCMessageType.JMX_MESSAGE,
438: JMXMessage.class);
439: channel.addClassMapping(
440: TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE,
441: JmxRemoteTunnelMessage.class);
442: channel.addClassMapping(
443: TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE,
444: ClusterMembershipMessage.class);
445: channel.addClassMapping(TCMessageType.CLIENT_JMX_READY_MESSAGE,
446: L1JmxReady.class);
447: channel
448: .addClassMapping(
449: TCMessageType.COMPLETED_TRANSACTION_LOWWATERMARK_MESSAGE,
450: CompletedTransactionLowWaterMarkMessage.class);
451:
452: logger.debug("Added class mappings.");
453:
454: Sink hydrateSink = hydrateStage.getSink();
455: channel.routeMessageType(TCMessageType.LOCK_RESPONSE_MESSAGE,
456: lockResponse.getSink(), hydrateSink);
457: channel.routeMessageType(
458: TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, lockResponse
459: .getSink(), hydrateSink);
460: channel.routeMessageType(TCMessageType.LOCK_STAT_MESSAGE,
461: lockResponse.getSink(), hydrateSink);
462: channel.routeMessageType(TCMessageType.LOCK_RECALL_MESSAGE,
463: lockResponse.getSink(), hydrateSink);
464: channel.routeMessageType(
465: TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE,
466: receiveRootID.getSink(), hydrateSink);
467: channel.routeMessageType(
468: TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE,
469: receiveObject.getSink(), hydrateSink);
470: channel.routeMessageType(
471: TCMessageType.OBJECTS_NOT_FOUND_RESPONSE_MESSAGE,
472: receiveObject.getSink(), hydrateSink);
473: channel.routeMessageType(
474: TCMessageType.BROADCAST_TRANSACTION_MESSAGE,
475: receiveTransaction.getSink(), hydrateSink);
476: channel.routeMessageType(
477: TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE,
478: oidRequestResponse.getSink(), hydrateSink);
479: channel.routeMessageType(
480: TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE,
481: transactionResponse.getSink(), hydrateSink);
482: channel.routeMessageType(
483: TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE,
484: batchTxnAckStage.getSink(), hydrateSink);
485: channel.routeMessageType(
486: TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, pauseStage
487: .getSink(), hydrateSink);
488: channel.routeMessageType(
489: TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE,
490: jmxRemoteTunnelStage.getSink(), hydrateSink);
491: channel.routeMessageType(
492: TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE,
493: pauseStage.getSink(), hydrateSink);
494:
495: final int maxConnectRetries = l1Properties
496: .getInt("max.connect.retries");
497: int i = 0;
498: while (maxConnectRetries <= 0 || i < maxConnectRetries) {
499: try {
500: logger.debug("Trying to open channel....");
501: channel.open();
502: logger.debug("Channel open");
503: break;
504: } catch (TCTimeoutException tcte) {
505: consoleLogger.warn("Timeout connecting to server: "
506: + serverHost + ":" + serverPort + ". "
507: + tcte.getMessage());
508: ThreadUtil.reallySleep(5000);
509: } catch (ConnectException e) {
510: consoleLogger.warn("Connection refused from server: "
511: + serverHost + ":" + serverPort);
512: ThreadUtil.reallySleep(5000);
513: } catch (MaxConnectionsExceededException e) {
514: consoleLogger
515: .warn("Connection refused MAXIMUM CONNECTIONS TO SERVER EXCEEDED: "
516: + serverHost + ":" + serverPort);
517: ThreadUtil.reallySleep(5000);
518: } catch (IOException ioe) {
519: ioe.printStackTrace();
520: throw new RuntimeException(ioe);
521: }
522: i++;
523: }
524: if (i == maxConnectRetries) {
525: consoleLogger.error("MaxConnectRetries '"
526: + maxConnectRetries + "' attempted. Exiting.");
527: System.exit(-1);
528: }
529: clientHandshakeManager.waitForHandshake();
530:
531: cluster.addClusterEventListener(l1Management
532: .getTerracottaCluster());
533: }
534:
535: public void stop() {
536: manager.stop();
537: }
538:
539: public ClientLockManager getLockManager() {
540: return lockManager;
541: }
542:
543: public ClientTransactionManager getTransactionManager() {
544: return txManager;
545: }
546:
547: public ClientObjectManager getObjectManager() {
548: return objectManager;
549: }
550:
551: public RemoteTransactionManager getRemoteTransactionManager() {
552: return rtxManager;
553: }
554:
555: public CommunicationsManager getCommunicationsManager() {
556: return communicationsManager;
557: }
558:
559: public DSOClientMessageChannel getChannel() {
560: return channel;
561: }
562:
563: public ClientHandshakeManager getClientHandshakeManager() {
564: return clientHandshakeManager;
565: }
566:
567: public RuntimeLogger getRuntimeLogger() {
568: return runtimeLogger;
569: }
570:
571: public SessionMonitorMBean getSessionMonitorMBean() {
572: return l1Management.findSessionMonitorMBean();
573: }
574:
575: public DmiManager getDmiManager() {
576: return dmiManager;
577: }
578:
579: }
|