001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.async.impl.InBandMoveToNextSink;
012: import com.tc.logging.TCLogger;
013: import com.tc.net.groups.ClientID;
014: import com.tc.net.groups.NodeID;
015: import com.tc.net.protocol.tcm.CommunicationsManager;
016: import com.tc.net.protocol.tcm.MessageChannel;
017: import com.tc.net.protocol.tcm.TCMessageType;
018: import com.tc.object.msg.ClusterMembershipMessage;
019: import com.tc.object.net.DSOChannelManager;
020: import com.tc.object.net.DSOChannelManagerEventListener;
021: import com.tc.objectserver.context.NodeStateEventContext;
022: import com.tc.objectserver.core.api.ServerConfigurationContext;
023: import com.tc.objectserver.tx.ServerTransactionManager;
024: import com.tc.objectserver.tx.TransactionBatchManager;
025:
026: public class ChannelLifeCycleHandler extends AbstractEventHandler
027: implements DSOChannelManagerEventListener {
028: private final ServerTransactionManager transactionManager;
029: private final TransactionBatchManager transactionBatchManager;
030: private TCLogger logger;
031: private final CommunicationsManager commsManager;
032: private final DSOChannelManager channelMgr;
033: private Sink channelSink;
034: private Sink hydrateSink;
035: private Sink processTransactionSink;
036:
037: public ChannelLifeCycleHandler(CommunicationsManager commsManager,
038: ServerTransactionManager transactionManager,
039: TransactionBatchManager transactionBatchManager,
040: DSOChannelManager channelManager) {
041: this .commsManager = commsManager;
042: this .transactionManager = transactionManager;
043: this .transactionBatchManager = transactionBatchManager;
044: this .channelMgr = channelManager;
045: }
046:
047: public void handleEvent(EventContext context) {
048: NodeStateEventContext event = (NodeStateEventContext) context;
049:
050: switch (event.getType()) {
051: case NodeStateEventContext.CREATE: {
052: channelCreated(event.getNodeID());
053: break;
054: }
055: case NodeStateEventContext.REMOVE: {
056: channelRemoved(event.getNodeID());
057: break;
058: }
059: default: {
060: throw new AssertionError("unknown event: "
061: + event.getType());
062: }
063: }
064: }
065:
066: private void channelRemoved(NodeID nodeID) {
067: broadcastClusterMembershipMessage(
068: ClusterMembershipMessage.EventType.NODE_DISCONNECTED,
069: nodeID);
070: if (commsManager.isInShutdown()) {
071: logger.info("Ignoring transport disconnect for " + nodeID
072: + " while shutting down.");
073: } else {
074: logger
075: .info("Received transport disconnect. Shutting down client "
076: + nodeID);
077: transactionManager.shutdownNode(nodeID);
078: transactionBatchManager.shutdownNode(nodeID);
079: }
080: }
081:
082: private void channelCreated(NodeID nodeID) {
083: broadcastClusterMembershipMessage(
084: ClusterMembershipMessage.EventType.NODE_CONNECTED,
085: nodeID);
086: }
087:
088: private void broadcastClusterMembershipMessage(int eventType,
089: NodeID nodeID) {
090: MessageChannel[] channels = channelMgr.getActiveChannels();
091: for (int i = 0; i < channels.length; i++) {
092: MessageChannel channel = channels[i];
093:
094: if (!channelMgr.getClientIDFor(channel.getChannelID())
095: .equals(nodeID)) {
096: ClusterMembershipMessage cmm = (ClusterMembershipMessage) channel
097: .createMessage(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE);
098: cmm.initialize(eventType, nodeID, channels);
099: cmm.send();
100: }
101: }
102: }
103:
104: public void initialize(ConfigurationContext context) {
105: super .initialize(context);
106: ServerConfigurationContext scc = (ServerConfigurationContext) context;
107: this .logger = scc.getLogger(ChannelLifeCycleHandler.class);
108: channelSink = scc.getStage(
109: ServerConfigurationContext.CHANNEL_LIFE_CYCLE_STAGE)
110: .getSink();
111: hydrateSink = scc.getStage(
112: ServerConfigurationContext.HYDRATE_MESSAGE_SINK)
113: .getSink();
114: processTransactionSink = scc.getStage(
115: ServerConfigurationContext.PROCESS_TRANSACTION_STAGE)
116: .getSink();
117: }
118:
119: public void channelCreated(MessageChannel channel) {
120: channelSink.add(new NodeStateEventContext(
121: NodeStateEventContext.CREATE, new ClientID(channel
122: .getChannelID())));
123: }
124:
125: public void channelRemoved(MessageChannel channel) {
126: // We want all the messages in the system from this client to reach its destinations before processing this request.
127: // esp. hydrate stage and process transaction stage. This goo is for that.
128: final NodeStateEventContext disconnectEvent = new NodeStateEventContext(
129: NodeStateEventContext.REMOVE, new ClientID(channel
130: .getChannelID()));
131: InBandMoveToNextSink context1 = new InBandMoveToNextSink(
132: disconnectEvent, channelSink);
133: InBandMoveToNextSink context2 = new InBandMoveToNextSink(
134: context1, processTransactionSink);
135: hydrateSink.add(context2);
136: }
137:
138: }
|