001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tctest;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
009:
010: import com.tc.cluster.ClusterEventListener;
011: import com.tc.object.bytecode.ManagerUtil;
012: import com.tc.object.config.ConfigVisitor;
013: import com.tc.object.config.DSOClientConfigHelper;
014: import com.tc.simulator.app.ApplicationConfig;
015: import com.tc.simulator.listener.ListenerProvider;
016: import com.tc.util.concurrent.ThreadUtil;
017:
018: import java.util.HashSet;
019:
020: public class ClusterMembershipEventTestApp extends
021: ServerCrashingAppBase implements ClusterEventListener {
022:
023: public ClusterMembershipEventTestApp(String appId,
024: ApplicationConfig config, ListenerProvider listenerProvider) {
025: super (appId, config, listenerProvider);
026: }
027:
028: private final int initialNodeCount = getParticipantCount();
029: private final CyclicBarrier barrier = new CyclicBarrier(
030: initialNodeCount);
031:
032: // not shared..
033: private final SynchronizedInt nodeConCnt = new SynchronizedInt(0);
034: private final SynchronizedInt nodeDisCnt = new SynchronizedInt(0);
035: private final SynchronizedInt this NodeConCnt = new SynchronizedInt(
036: 0);
037: private final SynchronizedInt this NodeDisCnt = new SynchronizedInt(
038: 0);
039: private final HashSet nodes = new HashSet();
040: private String this Node;
041:
042: public void runTest() throws Throwable {
043: ManagerUtil.addClusterEventListener(this );
044:
045: check(1, this NodeConCnt.get(), "thisNodeConnected");
046:
047: waitForNodes(initialNodeCount);
048:
049: System.err
050: .println("### stage 1 [all nodes connected]: thisNode="
051: + this Node + ", threadId="
052: + Thread.currentThread().getName());
053:
054: clearCounters();
055:
056: final boolean isMasterNode = barrier.barrier() == 0;
057:
058: if (isMasterNode) {
059: System.err.println("### masterNode=" + this Node
060: + " -> crashing server...");
061: getConfig().getServerControl().crash();
062: // this sleep should be longer than l1-reconnect timeout
063: ThreadUtil.reallySleep(30 * 1000);
064: System.err.println("### masterNode=" + this Node
065: + " -> crashed server");
066: System.err.println("### masterNode=" + this Node
067: + " -> restarting server...");
068: getConfig().getServerControl().start();
069: System.err.println("### masterNode=" + this Node
070: + " -> restarted server");
071: }
072: System.err.println("### stage 2 [reconnecting]: thisNode="
073: + this Node + ", threadId="
074: + Thread.currentThread().getName());
075: barrier.barrier();
076: waitForNodes(initialNodeCount);
077: check(1, this NodeDisCnt.get(), "thisNodeDisconnected");
078: check(1, this NodeConCnt.get(), "thisNodeConnected");
079:
080: clearCounters();
081: check(0, nodeConCnt.get(), "nodeConnected");
082: check(0, nodeDisCnt.get(), "nodeDisconnected");
083: barrier.barrier();
084: System.err.println("### stage 3 [reconnected]: thisNode="
085: + this Node + ", threadId="
086: + Thread.currentThread().getName());
087:
088: if (isMasterNode) {
089: // master node blocks until new client exists...
090: spawnNewClient("0", L1Client.class);
091: }
092: barrier.barrier();
093: System.err
094: .println("### stage 4 [new client disconnected]: thisNode="
095: + this Node
096: + ", threadId="
097: + Thread.currentThread().getName());
098:
099: waitForNodes(initialNodeCount);
100: check(1, nodeConCnt.get(), "nodeConnected");
101: check(1, nodeDisCnt.get(), "nodeDisconnected");
102: clearCounters();
103: barrier.barrier();
104: System.err.println("### stage 5 [all done]: thisNode="
105: + this Node + ", threadId="
106: + Thread.currentThread().getName());
107: }
108:
109: private void clearCounters() {
110: this .nodeConCnt.set(0);
111: this .nodeDisCnt.set(0);
112: this .this NodeConCnt.set(0);
113: this .this NodeDisCnt.set(0);
114: }
115:
116: private void waitForNodes(int expectedSize) {
117: while (true) {
118: synchronized (nodes) {
119: if (nodes.size() == expectedSize)
120: break;
121: try {
122: nodes.wait();
123: } catch (InterruptedException e) {
124: notifyError(e);
125: }
126: }
127: }
128: }
129:
130: private void check(int expectedMin, int actual, String msg) {
131: // NOTE: on some systems (Solaris, Win) we get multiple disconnect/connect events
132: // per one logical disconnect/connect occurance.
133: if (actual < expectedMin)
134: notifyError(msg + " expectedMin=" + expectedMin
135: + ", actual=" + actual + ", thisNodeId=" + this Node);
136: }
137:
138: public void nodeConnected(String nodeId) {
139: new Throwable(
140: "### TRACE: ClusterMembershipEventTestApp.nodeConnected()")
141: .printStackTrace();
142: nodeConCnt.increment();
143: System.err.println("\n### nodeConnected: thisNode=" + this Node
144: + ", nodeId=" + nodeId + ", threadId="
145: + Thread.currentThread().getName() + ", cnt="
146: + nodeConCnt.get());
147: synchronized (nodes) {
148: nodes.add(nodeId);
149: nodes.notifyAll();
150: }
151: }
152:
153: public void nodeDisconnected(String nodeId) {
154: new Throwable(
155: "### TRACE: ClusterMembershipEventTestApp.nodeDisconnected()")
156: .printStackTrace();
157: nodeDisCnt.increment();
158: System.err.println("\n### nodeDisconnected: thisNode="
159: + this Node + ", nodeId=" + nodeId + ", threadId="
160: + Thread.currentThread().getName() + ", cnt="
161: + nodeDisCnt.get());
162: synchronized (nodes) {
163: nodes.remove(nodeId);
164: nodes.notifyAll();
165: }
166: }
167:
168: public void this NodeConnected(String this NodeId,
169: String[] nodesCurrentlyInCluster) {
170: new Throwable(
171: "### TRACE: ClusterMembershipEventTestApp.thisNodeConnected()")
172: .printStackTrace();
173: this NodeConCnt.increment();
174: this Node = this NodeId;
175: System.err.println("\n### thisNodeConnected->thisNodeId="
176: + this NodeId + ", threadId="
177: + Thread.currentThread().getName() + ", cnt="
178: + this NodeConCnt.get());
179: synchronized (nodes) {
180: nodes.add(this Node);
181: for (int i = 0; i < nodesCurrentlyInCluster.length; i++) {
182: nodes.add(nodesCurrentlyInCluster[i]);
183: }
184: nodes.notifyAll();
185: }
186: }
187:
188: public void this NodeDisconnected(String this NodeId) {
189: new Throwable(
190: "### TRACE: ClusterMembershipEventTestApp.thisNodeDisconnected()")
191: .printStackTrace();
192: this NodeDisCnt.increment();
193: System.err.println("\n### thisNodeDisconnected->thisNodeId="
194: + this NodeId + ", threadId="
195: + Thread.currentThread().getName() + ", cnt="
196: + this NodeDisCnt.get());
197: synchronized (nodes) {
198: nodes.clear();
199: nodes.notifyAll();
200: }
201: }
202:
203: public static void visitL1DSOConfig(ConfigVisitor visitor,
204: DSOClientConfigHelper config) {
205: config.addIncludePattern(ClusterMembershipEventTestApp.class
206: .getName());
207: config.addRoot("barrier", ClusterMembershipEventTestApp.class
208: .getName()
209: + ".barrier");
210: config.addWriteAutolock("* "
211: + ClusterMembershipEventTestApp.class.getName()
212: + ".*(..)");
213:
214: config.addIncludePattern(CyclicBarrier.class.getName());
215: config.addWriteAutolock("* " + CyclicBarrier.class.getName()
216: + ".*(..)");
217: }
218:
219: public static class L1Client {
220: public static void main(String args[]) {
221: // nothing to do
222: }
223: }
224:
225: }
|