001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.cluster;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009:
010: import java.util.Arrays;
011: import java.util.HashMap;
012: import java.util.IdentityHashMap;
013: import java.util.Iterator;
014: import java.util.Map;
015:
016: public class Cluster {
017:
018: private static final TCLogger logger = TCLogging
019: .getLogger(Cluster.class);
020: private static final boolean debug = false;
021:
022: private final Map nodes = new HashMap(); // <String, Node>
023: private final IdentityHashMap listeners = new IdentityHashMap(); // <ClusterEventListener>
024:
025: private Node this Node;
026:
027: public synchronized Node getThisNode() {
028: return this Node;
029: }
030:
031: // for tests
032: Map getNodes() {
033: return nodes;
034: }
035:
036: public synchronized void this NodeConnected(final String this NodeId,
037: String[] nodesCurrentlyInCluster) {
038: // we might get multiple calls in a row, ignore all but the first in a row.
039: if (this Node != null)
040: return;
041:
042: this Node = new Node(this NodeId);
043: nodes.put(this Node.getNodeId(), this Node);
044:
045: for (int i = 0; i < nodesCurrentlyInCluster.length; i++) {
046: Node n = new Node(nodesCurrentlyInCluster[i]);
047: nodes.put(n.getNodeId(), n);
048: }
049:
050: debug("### Cluster: thisNodeConnected -> " + this );
051: fireThisNodeConnectedEvent();
052: }
053:
054: public synchronized void this NodeDisconnected() {
055: debug("### Cluster: thisNodeDisconnected -> " + this );
056: if (this Node == null) {
057: // client channels be closed before we know thisNodeID. Skip the disconnect event in this case
058: return;
059: }
060: fireThisNodeDisconnectedEvent();
061: nodes.clear();
062: this Node = null;
063: }
064:
065: public synchronized void nodeConnected(String nodeId) {
066: Node n = new Node(nodeId);
067:
068: // the server should not be sending this event to us
069: if (n.equals(this Node)) {
070: throw new AssertionError("received message for self");
071: }
072:
073: nodes.put(n.getNodeId(), n);
074: debug("### Cluster: nodeConnected -> " + this );
075: fireNodeConnectedEvent(nodeId);
076: }
077:
078: public synchronized void nodeDisconnected(String nodeId) {
079: nodes.remove(nodeId);
080: debug("### Cluster: nodeDisconnected -> " + this );
081: fireNodeDisconnectedEvent(nodeId);
082: }
083:
084: public synchronized String toString() {
085: // NOTE: this method is used in the error logging
086: return "Cluster{ thisNode=" + this Node + ", nodesInCluster="
087: + nodes.keySet() + "}";
088: }
089:
090: public synchronized void addClusterEventListener(
091: ClusterEventListener cel) {
092: // If this assertion is going off, you're (probably) trying to add your listener too early
093: assertPre(this Node != null);
094:
095: Object oldCel = listeners.put(cel, cel);
096: if (oldCel == null) {
097: fireThisNodeConnectedEvent(cel, getCurrentNodeIds());
098: }
099: }
100:
101: private void fireThisNodeConnectedEvent(ClusterEventListener cel,
102: String[] ids) {
103: try {
104: cel.this NodeConnected(this Node.getNodeId(), ids);
105: } catch (Throwable t) {
106: log(t);
107: }
108: }
109:
110: private void fireThisNodeConnectedEvent() {
111: assertPre(this Node != null);
112: final String ids[] = getCurrentNodeIds();
113: for (Iterator i = listeners.keySet().iterator(); i.hasNext();) {
114: ClusterEventListener l = (ClusterEventListener) i.next();
115: fireThisNodeConnectedEvent(l, ids);
116: }
117: }
118:
119: private void fireThisNodeDisconnectedEvent() {
120: for (Iterator i = listeners.keySet().iterator(); i.hasNext();) {
121: ClusterEventListener l = (ClusterEventListener) i.next();
122: try {
123: l.this NodeDisconnected(this Node.getNodeId());
124: } catch (Throwable t) {
125: log(t);
126: }
127: }
128: }
129:
130: private void fireNodeConnectedEvent(String newNodeId) {
131: if (this Node == null) {
132: return;
133: }
134: for (Iterator i = listeners.keySet().iterator(); i.hasNext();) {
135: ClusterEventListener l = (ClusterEventListener) i.next();
136: try {
137: l.nodeConnected(newNodeId);
138: } catch (Throwable t) {
139: log(t);
140: }
141: }
142: }
143:
144: private void fireNodeDisconnectedEvent(String nodeId) {
145: if (this Node == null) {
146: return;
147: }
148: for (Iterator i = listeners.keySet().iterator(); i.hasNext();) {
149: ClusterEventListener l = (ClusterEventListener) i.next();
150: try {
151: l.nodeDisconnected(nodeId);
152: } catch (Throwable t) {
153: log(t);
154: }
155: }
156: }
157:
158: private static void assertPre(boolean b) {
159: if (!b)
160: throw new AssertionError("Pre-condition failed!");
161: }
162:
163: private String[] getCurrentNodeIds() {
164: final String[] rv = new String[nodes.size()];
165: nodes.keySet().toArray(rv);
166: Arrays.sort(rv);
167: return rv;
168: }
169:
170: private static void debug(String string) {
171: if (debug)
172: System.err.println(string);
173: }
174:
175: private void log(Throwable t) {
176: logger
177: .error("Unhandled exception in event callback " + this,
178: t);
179: }
180: }
|