001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.management.remote.protocol.terracotta;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.EventContext;
009: import com.tc.async.api.EventHandlerException;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.net.protocol.tcm.ChannelEvent;
013: import com.tc.net.protocol.tcm.ChannelEventListener;
014: import com.tc.net.protocol.tcm.ChannelEventType;
015: import com.tc.net.protocol.tcm.MessageChannel;
016: import com.tc.net.protocol.tcm.TCMessage;
017: import com.tc.net.protocol.tcm.TCMessageType;
018: import com.tc.util.concurrent.SetOnceFlag;
019:
020: import java.io.IOException;
021:
022: import javax.management.remote.generic.MessageConnection;
023: import javax.management.remote.message.Message;
024:
025: public final class TunnelingEventHandler extends AbstractEventHandler
026: implements ChannelEventListener {
027:
028: private static final TCLogger logger = TCLogging
029: .getLogger(TunnelingEventHandler.class);
030:
031: private final MessageChannel channel;
032:
033: private TunnelingMessageConnection messageConnection;
034:
035: private boolean acceptOk;
036:
037: private Object jmxReadyLock;
038:
039: private SetOnceFlag localJmxServerReady;
040:
041: private boolean transportConnected;
042:
043: private boolean sentReadyMessage;
044:
045: public TunnelingEventHandler(final MessageChannel channel) {
046: this .channel = channel;
047: this .channel.addListener(this );
048: acceptOk = false;
049: jmxReadyLock = new Object();
050: localJmxServerReady = new SetOnceFlag();
051: transportConnected = false;
052: sentReadyMessage = false;
053: }
054:
055: public void handleEvent(final EventContext context)
056: throws EventHandlerException {
057: final JmxRemoteTunnelMessage messageEnvelope = (JmxRemoteTunnelMessage) context;
058: if (messageEnvelope.getCloseConnection()) {
059: reset();
060: } else {
061: final Message message = messageEnvelope
062: .getTunneledMessage();
063: synchronized (this ) {
064: if (messageEnvelope.getInitConnection()) {
065: if (messageConnection != null) {
066: logger
067: .warn("Received a client connection initialization, resetting existing connection");
068: reset();
069: }
070: messageConnection = new TunnelingMessageConnection(
071: channel, true);
072: acceptOk = true;
073: notifyAll();
074: } else if (messageConnection == null) {
075: logger
076: .warn("Received unexpected data message, connection is not yet established");
077: } else {
078: if (message != null) {
079: messageConnection
080: .incomingNetworkMessage(message);
081: } else {
082: logger
083: .warn("Received tunneled message with no data, resetting connection");
084: reset();
085: }
086: }
087: }
088: }
089: }
090:
091: synchronized MessageConnection accept() throws IOException {
092: while (!acceptOk) {
093: try {
094: wait();
095: } catch (InterruptedException ie) {
096: logger
097: .warn(
098: "Interrupted while waiting for a new connection",
099: ie);
100: throw new IOException(
101: "Interrupted while waiting for new connection: "
102: + ie.getMessage());
103: }
104: }
105: acceptOk = false;
106: return messageConnection;
107: }
108:
109: private synchronized void reset() {
110: if (messageConnection != null) {
111: try {
112: messageConnection.close();
113: } catch (IOException ioe) {
114: logger
115: .warn("Caught I/O exception while closing tunneled JMX connection");
116: }
117: }
118: messageConnection = null;
119: acceptOk = false;
120: synchronized (jmxReadyLock) {
121: sentReadyMessage = false;
122: }
123: notifyAll();
124: }
125:
126: public void notifyChannelEvent(final ChannelEvent event) {
127: if (event.getChannel() == channel) {
128: if (event.getType() == ChannelEventType.TRANSPORT_CONNECTED_EVENT) {
129: synchronized (jmxReadyLock) {
130: transportConnected = true;
131: }
132: sendJmxReadyMessageIfNecessary();
133: } else if (event.getType() == ChannelEventType.CHANNEL_CLOSED_EVENT
134: || event.getType() == ChannelEventType.TRANSPORT_DISCONNECTED_EVENT) {
135: reset();
136: synchronized (jmxReadyLock) {
137: transportConnected = false;
138: }
139: }
140: }
141: }
142:
143: public void jmxIsReady() {
144: synchronized (jmxReadyLock) {
145: localJmxServerReady.set();
146: }
147:
148: sendJmxReadyMessageIfNecessary();
149: }
150:
151: /**
152: * Once the local JMX server has successfully started (this happens in a
153: * background thread as DSO is so early in the startup process that the
154: * system JMX server in 1.5+ can't be created inline with other
155: * initialization) we send a 'ready' message to the L2 each time we connect
156: * to it. This tells the L2 that they can connect to our local JMX server
157: * and see the beans we have in the DSO client.
158: */
159: private void sendJmxReadyMessageIfNecessary() {
160: final boolean send;
161: synchronized (jmxReadyLock) {
162: send = localJmxServerReady.isSet() && transportConnected
163: && !sentReadyMessage;
164: if (send) {
165: sentReadyMessage = true;
166: }
167: }
168:
169: // Doing this message send outside of the jmxReadyLock to avoid a
170: // deadlock (CDV-132)
171: if (send) {
172: logger
173: .info("Client JMX server ready; sending notification to L2 server");
174: TCMessage readyMessage = channel
175: .createMessage(TCMessageType.CLIENT_JMX_READY_MESSAGE);
176: readyMessage.send();
177: }
178:
179: }
180: }
|