001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.management.remote.protocol.terracotta;
005:
006: import java.io.IOException;
007: import java.util.LinkedList;
008: import java.util.Map;
009:
010: import javax.management.remote.generic.MessageConnection;
011: import javax.management.remote.message.Message;
012:
013: import com.tc.net.protocol.tcm.MessageChannel;
014: import com.tc.net.protocol.tcm.TCMessageType;
015:
016: public final class TunnelingMessageConnection implements
017: MessageConnection {
018:
019: private final LinkedList inbox;
020: private final MessageChannel channel;
021: private boolean connected;
022: private final boolean isJmxConnectionServer;
023:
024: /**
025: * @param channel outgoing network channel, calls to {@link #writeMessage(Message)} will drop messages here and send
026: * to the other side
027: */
028: public TunnelingMessageConnection(final MessageChannel channel,
029: boolean isJmxConnectionServer) {
030: this .isJmxConnectionServer = isJmxConnectionServer;
031: this .inbox = new LinkedList();
032: this .channel = channel;
033: connected = false;
034: }
035:
036: public synchronized void close() throws IOException {
037: checkConnected();
038: connected = false;
039: synchronized (inbox) {
040: inbox.clear();
041: inbox.notifyAll();
042: }
043: }
044:
045: public synchronized void connect(final Map environment)
046: throws IOException {
047: if (connected) {
048: throw new IOException("Connection is already open");
049: }
050: if (!isJmxConnectionServer) {
051: JmxRemoteTunnelMessage connectMessage = (JmxRemoteTunnelMessage) channel
052: .createMessage(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE);
053: connectMessage.setInitConnection();
054: connectMessage.send();
055: }
056: connected = true;
057: }
058:
059: public String getConnectionId() {
060: return channel.getRemoteAddress().getStringForm();
061: }
062:
063: public Message readMessage() throws IOException,
064: ClassNotFoundException {
065: Message inboundMessage = null;
066: while (inboundMessage == null) {
067: checkConnected();
068: synchronized (inbox) {
069: if (inbox.isEmpty()) {
070: try {
071: inbox.wait();
072: } catch (InterruptedException ie) {
073: throw new IOException(
074: "Interrupted while waiting for inbound message");
075: }
076: } else {
077: inboundMessage = (Message) inbox.removeFirst();
078: inbox.notifyAll();
079: }
080: }
081: }
082: return inboundMessage;
083: }
084:
085: public void writeMessage(final Message outboundMessage)
086: throws IOException {
087: JmxRemoteTunnelMessage messageEnvelope = (JmxRemoteTunnelMessage) channel
088: .createMessage(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE);
089: messageEnvelope.setTunneledMessage(outboundMessage);
090: messageEnvelope.send();
091: }
092:
093: /**
094: * This should only be invoked from the SEDA event handler that receives incoming network messages.
095: */
096: void incomingNetworkMessage(final Message inboundMessage) {
097: synchronized (this ) {
098: if (!connected)
099: return;
100: }
101: synchronized (inbox) {
102: inbox.addLast(inboundMessage);
103: inbox.notifyAll();
104: }
105: }
106:
107: private synchronized void checkConnected() throws IOException {
108: if (!connected) {
109: throw new IOException("Connection has been closed");
110: }
111: }
112:
113: }
|