001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.net;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
008:
009: import com.tc.logging.TCLogger;
010: import com.tc.logging.TCLogging;
011: import com.tc.net.TCSocketAddress;
012: import com.tc.net.groups.ClientID;
013: import com.tc.net.groups.NodeID;
014: import com.tc.net.protocol.tcm.ChannelID;
015: import com.tc.net.protocol.tcm.ChannelManager;
016: import com.tc.net.protocol.tcm.ChannelManagerEventListener;
017: import com.tc.net.protocol.tcm.MessageChannel;
018: import com.tc.net.protocol.tcm.MessageChannelInternal;
019: import com.tc.net.protocol.tcm.TCMessageType;
020: import com.tc.object.msg.BatchTransactionAcknowledgeMessage;
021: import com.tc.object.msg.ClientHandshakeAckMessage;
022: import com.tc.util.concurrent.CopyOnWriteArrayMap;
023:
024: import java.util.Collection;
025: import java.util.HashSet;
026: import java.util.Iterator;
027: import java.util.List;
028: import java.util.Set;
029:
030: /**
031: * Wraps the generic ChannelManager to hide it from the rest of the DSO world and to provide delayed visibility of
032: * clients and hides the channel to client ID mapping from the rest of the world.
033: */
034: public class DSOChannelManagerImpl implements DSOChannelManager,
035: DSOChannelManagerMBean {
036: private static final TCLogger logger = TCLogging
037: .getLogger(DSOChannelManager.class);
038:
039: private final CopyOnWriteArrayMap activeChannels = new CopyOnWriteArrayMap(
040: new CopyOnWriteArrayMap.TypedArrayFactory() {
041:
042: public Object[] createTypedArray(int size) {
043: return new MessageChannel[size];
044: }
045:
046: });
047: private final List eventListeners = new CopyOnWriteArrayList();
048:
049: private final ChannelManager genericChannelManager;
050: private final String serverVersion;
051:
052: public DSOChannelManagerImpl(ChannelManager genericChannelManager,
053: String serverVersion) {
054: this .genericChannelManager = genericChannelManager;
055: this .genericChannelManager
056: .addEventListener(new GenericChannelEventListener());
057: this .serverVersion = serverVersion;
058: }
059:
060: public MessageChannel getActiveChannel(NodeID id)
061: throws NoSuchChannelException {
062: final MessageChannel rv = (MessageChannel) activeChannels
063: .get(id);
064: if (rv == null) {
065: throw new NoSuchChannelException("No such channel: " + id);
066: }
067: return rv;
068: }
069:
070: public void closeAll(Collection clientIDs) {
071: for (Iterator i = clientIDs.iterator(); i.hasNext();) {
072: ClientID id = (ClientID) i.next();
073:
074: MessageChannel channel = genericChannelManager
075: .getChannel(id.getChannelID());
076: if (channel != null) {
077: channel.close();
078: }
079: }
080: }
081:
082: public MessageChannel[] getActiveChannels() {
083: return (MessageChannel[]) activeChannels.valuesToArray();
084: }
085:
086: public boolean isActiveID(NodeID nodeID) {
087: return activeChannels.containsKey(nodeID);
088: }
089:
090: public String getChannelAddress(NodeID nid) {
091: try {
092: MessageChannel channel = getActiveChannel(nid);
093: TCSocketAddress addr = channel.getRemoteAddress();
094: return addr.getStringForm();
095: } catch (NoSuchChannelException e) {
096: return "no longer connected";
097: }
098: }
099:
100: public BatchTransactionAcknowledgeMessage newBatchTransactionAcknowledgeMessage(
101: NodeID nid) throws NoSuchChannelException {
102: return (BatchTransactionAcknowledgeMessage) getActiveChannel(
103: nid).createMessage(
104: TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE);
105: }
106:
107: private ClientHandshakeAckMessage newClientHandshakeAckMessage(
108: ClientID clientID) throws NoSuchChannelException {
109: MessageChannelInternal channel = genericChannelManager
110: .getChannel(clientID.getChannelID());
111: if (channel == null) {
112: throw new NoSuchChannelException();
113: }
114: return (ClientHandshakeAckMessage) channel
115: .createMessage(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE);
116: }
117:
118: public Set getAllActiveClientIDs() {
119: return activeChannels.keySet();
120: }
121:
122: public void makeChannelActive(ClientID clientID, long startIDs,
123: long endIDs, boolean persistent) {
124: try {
125: ClientHandshakeAckMessage ackMsg = newClientHandshakeAckMessage(clientID);
126: MessageChannel channel = ackMsg.getChannel();
127: synchronized (activeChannels) {
128: activeChannels.put(clientID, channel);
129: ackMsg.initialize(startIDs, endIDs, persistent,
130: getAllActiveClientIDsString(), clientID
131: .toString(), serverVersion);
132: ackMsg.send();
133: }
134: fireChannelCreatedEvent(channel);
135: } catch (NoSuchChannelException nsce) {
136: logger
137: .warn("Not sending handshake message to disconnected client: "
138: + clientID);
139: }
140: }
141:
142: private Set getAllActiveClientIDsString() {
143: Set clientIDStrings = new HashSet();
144: synchronized (activeChannels) {
145: for (Iterator i = activeChannels.keySet().iterator(); i
146: .hasNext();) {
147: ClientID cid = (ClientID) i.next();
148: clientIDStrings.add(cid.toString());
149: }
150: }
151: return clientIDStrings;
152: }
153:
154: public void makeChannelActiveNoAck(MessageChannel channel) {
155: activeChannels.put(getClientIDFor(channel.getChannelID()),
156: channel);
157: }
158:
159: public void addEventListener(DSOChannelManagerEventListener listener) {
160: if (listener == null) {
161: throw new NullPointerException("listener cannot be be null");
162: }
163: eventListeners.add(listener);
164: }
165:
166: public Set getAllClientIDs() {
167: Set channelIDs = genericChannelManager.getAllChannelIDs();
168: Set clientIDs = new HashSet(channelIDs.size());
169: for (Iterator i = channelIDs.iterator(); i.hasNext();) {
170: ChannelID cid = (ChannelID) i.next();
171: clientIDs.add(getClientIDFor(cid));
172: }
173: return clientIDs;
174: }
175:
176: private void fireChannelCreatedEvent(MessageChannel channel) {
177: for (Iterator iter = eventListeners.iterator(); iter.hasNext();) {
178: DSOChannelManagerEventListener eventListener = (DSOChannelManagerEventListener) iter
179: .next();
180: eventListener.channelCreated(channel);
181: }
182: }
183:
184: private void fireChannelRemovedEvent(MessageChannel channel) {
185: for (Iterator iter = eventListeners.iterator(); iter.hasNext();) {
186: DSOChannelManagerEventListener eventListener = (DSOChannelManagerEventListener) iter
187: .next();
188: eventListener.channelRemoved(channel);
189: }
190: }
191:
192: private class GenericChannelEventListener implements
193: ChannelManagerEventListener {
194:
195: public void channelCreated(MessageChannel channel) {
196: // nothing
197: }
198:
199: public void channelRemoved(MessageChannel channel) {
200: activeChannels
201: .remove(getClientIDFor(channel.getChannelID()));
202: fireChannelRemovedEvent(channel);
203: }
204:
205: }
206:
207: public ClientID getClientIDFor(ChannelID channelID) {
208: return new ClientID(channelID);
209: }
210:
211: }
|