001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.management.remote.protocol.terracotta;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.EventContext;
009: import com.tc.async.api.EventHandlerException;
010: import com.tc.async.api.Sink;
011: import com.tc.logging.TCLogger;
012: import com.tc.logging.TCLogging;
013: import com.tc.net.protocol.tcm.ChannelID;
014: import com.tc.net.protocol.tcm.MessageChannel;
015: import com.tc.net.protocol.tcm.TCMessageType;
016: import com.tc.object.net.DSOChannelManagerEventListener;
017:
018: import java.util.HashMap;
019: import java.util.Map;
020:
021: import javax.management.MBeanServer;
022: import javax.management.MBeanServerFactory;
023: import javax.management.remote.message.Message;
024:
025: public class ClientTunnelingEventHandler extends AbstractEventHandler
026: implements DSOChannelManagerEventListener {
027:
028: public static final class L1ConnectionMessage implements
029: EventContext {
030:
031: private final MBeanServer mbs;
032: private final MessageChannel channel;
033: private final Map channelIdToJmxConnector;
034: private final Map channelIdToMsgConnection;
035: private final boolean isConnectingMsg;
036:
037: public L1ConnectionMessage(MBeanServer mbs,
038: MessageChannel channel, Map channelIdToJmxConnector,
039: Map channelIdToMsgConnection, boolean isConnectingMsg) {
040: this .mbs = mbs;
041: this .channel = channel;
042: this .channelIdToJmxConnector = channelIdToJmxConnector;
043: this .channelIdToMsgConnection = channelIdToMsgConnection;
044: this .isConnectingMsg = isConnectingMsg;
045:
046: if (isConnectingMsg && mbs == null) {
047: final AssertionError ae = new AssertionError(
048: "Attempting to create a L1-connecting-message without"
049: + " a valid mBeanServer.");
050: throw ae;
051: }
052: }
053:
054: public MBeanServer getMBeanServer() {
055: return mbs;
056: }
057:
058: public MessageChannel getChannel() {
059: return channel;
060: }
061:
062: public Map getChannelIdToJmxConnector() {
063: return channelIdToJmxConnector;
064: }
065:
066: public Map getChannelIdToMsgConnector() {
067: return channelIdToMsgConnection;
068: }
069:
070: public boolean isConnectingMsg() {
071: return isConnectingMsg;
072: }
073: }
074:
075: private static final TCLogger logger = TCLogging
076: .getLogger(ClientTunnelingEventHandler.class);
077:
078: private final Map channelIdToJmxConnector;
079: private final Map channelIdToMsgConnection;
080: private final MBeanServer l2MBeanServer;
081: private final Object sinkLock;
082: private Sink connectStageSink;
083:
084: public ClientTunnelingEventHandler() {
085: l2MBeanServer = (MBeanServer) MBeanServerFactory
086: .findMBeanServer(null).get(0);
087: channelIdToJmxConnector = new HashMap();
088: channelIdToMsgConnection = new HashMap();
089: sinkLock = new Object();
090: }
091:
092: public void handleEvent(final EventContext context)
093: throws EventHandlerException {
094: if (context instanceof L1JmxReady) {
095: final L1JmxReady readyMessage = (L1JmxReady) context;
096: connectToL1JmxServer(readyMessage.getChannel());
097: } else {
098: final JmxRemoteTunnelMessage messageEnvelope = (JmxRemoteTunnelMessage) context;
099: if (messageEnvelope.getCloseConnection()) {
100: channelRemoved(messageEnvelope.getChannel());
101: } else if (messageEnvelope.getInitConnection()) {
102: logger
103: .warn("Received a JMX tunneled connection init from the remote"
104: + " JMX server, only the JMX client should do this");
105: } else {
106: routeTunneledMessage(messageEnvelope);
107: }
108: }
109: }
110:
111: private void connectToL1JmxServer(final MessageChannel channel) {
112: logger
113: .info("L1["
114: + channel.getChannelID()
115: + "] notified us that their JMX server is now available");
116: EventContext msg = new L1ConnectionMessage(l2MBeanServer,
117: channel, channelIdToJmxConnector,
118: channelIdToMsgConnection, true);
119: synchronized (sinkLock) {
120: if (connectStageSink == null) {
121: throw new AssertionError(
122: "ConnectStageSink was not set.");
123: }
124: connectStageSink.add(msg);
125: }
126: }
127:
128: private void routeTunneledMessage(
129: final JmxRemoteTunnelMessage messageEnvelope) {
130: final Message message = messageEnvelope.getTunneledMessage();
131: final MessageChannel channel = messageEnvelope.getChannel();
132: final ChannelID channelID = channel.getChannelID();
133: final TunnelingMessageConnection tmc;
134: synchronized (channelIdToMsgConnection) {
135: tmc = (TunnelingMessageConnection) channelIdToMsgConnection
136: .get(channelID);
137: }
138: if (tmc != null) {
139: tmc.incomingNetworkMessage(message);
140: } else {
141: logger
142: .warn("Received tunneled JMX message with no associated message connection,"
143: + " sending close() to remote JMX server");
144: final JmxRemoteTunnelMessage closeMessage = (JmxRemoteTunnelMessage) channel
145: .createMessage(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE);
146: closeMessage.setCloseConnection();
147: closeMessage.send();
148: }
149: }
150:
151: public void channelCreated(final MessageChannel channel) {
152: // DEV-16: Instead of immediately interrogating an L1's JMX server as soon as it connects, we wait for the L1 client
153: // to send us a 'L1JmxReady' network message to avoid a startup race condition
154: }
155:
156: public void channelRemoved(final MessageChannel channel) {
157: EventContext msg = new L1ConnectionMessage(null, channel,
158: channelIdToJmxConnector, channelIdToMsgConnection,
159: false);
160: synchronized (sinkLock) {
161: if (connectStageSink == null) {
162: throw new AssertionError(
163: "ConnectStageSink was not set.");
164: }
165: connectStageSink.add(msg);
166: }
167: }
168:
169: public void setConnectStageSink(Sink sink) {
170: synchronized (sinkLock) {
171: if (connectStageSink != null) {
172: logger
173: .warn("Attempted to set ConnectStageSink more than once.");
174: return;
175: }
176: connectStageSink = sink;
177: }
178: }
179: }
|