001: // $Id: CloseTest.java,v 1.11 2006/09/22 11:57:20 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import junit.framework.TestCase;
006: import org.jgroups.*;
007: import org.jgroups.util.Util;
008:
009: import java.util.Vector;
010: import java.lang.management.ThreadMXBean;
011: import java.lang.management.ManagementFactory;
012: import java.lang.management.ThreadInfo;
013:
014: /**
015: * Demos the creation of a channel and subsequent connection and closing. Demo application should exit (no
016: * more threads running)
017: */
018: public class CloseTest extends TestCase {
019: JChannel channel, channel1, channel2, c1, c2, c3;
020: int active_threads = 0;
021:
022: String props = "UDP(mcast_addr=228.8.8.3;mcast_port=45577;ip_ttl=32;"
023: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000;"
024: + "enable_bundling=true;max_bundle_timeout=30;use_incoming_packet_handler=true;loopback=true):"
025: + "PING(timeout=2000;num_initial_members=3):"
026: + "MERGE2(min_interval=5000;max_interval=10000):"
027: + "FD_SOCK:"
028: + "VERIFY_SUSPECT(timeout=1500):"
029: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
030: + "UNICAST(timeout=600,1200,2400):"
031: + "pbcast.STABLE(desired_avg_gossip=20000):"
032: + "FRAG(frag_size=4096;down_thread=false;up_thread=false):"
033: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
034: + "shun=true;print_local_addr=true)";
035:
036: public CloseTest(String name) {
037: super (name);
038: }
039:
040: protected void setUp() throws Exception {
041: super .setUp();
042: String cfg = System.getProperty("config");
043: if (cfg != null)
044: props = cfg;
045: active_threads = Thread.activeCount();
046: System.out.println("active threads before (" + active_threads
047: + "):\n" + Util.activeThreads());
048: }
049:
050: protected void tearDown() throws Exception {
051: super .tearDown();
052: closeChannel(channel);
053: closeChannel(channel1);
054: closeChannel(channel2);
055: closeChannel(c1);
056: closeChannel(c2);
057: closeChannel(c3);
058:
059: int current_active_threads = Thread.activeCount();
060: System.out.println("active threads after ("
061: + current_active_threads + "):\n"
062: + Util.activeThreads());
063: // System.out.println("thread:\n" + dumpThreads());
064: String msg = "";
065: if (active_threads != current_active_threads) {
066: msg = "active threads:\n" + dumpThreads();
067: }
068: assertEquals(msg, active_threads, current_active_threads);
069: }
070:
071: private void closeChannel(JChannel c) {
072: if (c != null && (c.isOpen() || c.isConnected())) {
073: c.close();
074: }
075: }
076:
077: public void testDoubleClose() throws ChannelException {
078: System.out.println("-- creating channel1 --");
079: channel1 = new JChannel(props);
080: System.out.println("-- connecting channel1 --");
081: channel1.connect("bla");
082: System.out.println("-- closing channel1 --");
083: channel1.close();
084: System.out.println("-- closing channel1 (again) --");
085: channel1.close();
086: System.out.println("-- done, threads are ");
087: Util.printThreads();
088: }
089:
090: public void testCreationAndClose() throws Exception {
091: System.out.println("-- creating channel1 --");
092: channel1 = new JChannel(props);
093: System.out.println("-- connecting channel1 --");
094: channel1.connect("CloseTest1");
095: System.out.println("-- closing channel1 --");
096: channel1.close();
097: System.out.println("-- done, threads are ");
098: Util.printThreads();
099: }
100:
101: public void testViewChangeReceptionOnChannelCloseByParticipant()
102: throws Exception {
103: Address a1, a2;
104: Vector members;
105:
106: c1 = new JChannel(props);
107: System.out.println("-- connecting c1");
108: c1.connect("X");
109: Util.sleep(500); // time to receive its own view
110: dumpMessages("c1", c1);
111: a1 = c1.getLocalAddress();
112:
113: c2 = new JChannel(props);
114: System.out.println("-- connecting c2");
115: c2.connect("X");
116: Util.sleep(500); // time to receive its own view
117: a2 = c2.getLocalAddress();
118: dumpMessages("c2", c2);
119:
120: System.out.println("-- closing c2");
121: c2.close();
122: Object obj = c1.receive(100);
123: assertTrue(obj instanceof View);
124: View v = (View) obj;
125: members = v.getMembers();
126: System.out.println("-- first view of c1: " + v);
127: assertEquals(2, members.size());
128: assertTrue(members.contains(a1));
129: assertTrue(members.contains(a2));
130:
131: obj = c1.receive(100);
132: assertTrue(obj instanceof View);
133: v = (View) obj;
134: members = v.getMembers();
135: System.out.println("-- second view of c1: " + v);
136: assertEquals(1, members.size());
137: assertTrue(members.contains(a1));
138: assertFalse(members.contains(a2));
139: }
140:
141: public void testViewChangeReceptionOnChannelCloseByCoordinator()
142: throws Exception {
143: Address a1, a2;
144: Vector members;
145: Object obj;
146: View v;
147:
148: c1 = new JChannel(props);
149: c1.connect("X");
150: Util.sleep(500); // time to receive its own view
151: dumpMessages("c1", c1);
152: a1 = c1.getLocalAddress();
153:
154: c2 = new JChannel(props);
155: c2.connect("X");
156: Util.sleep(500); // time to receive its own view
157: a2 = c2.getLocalAddress();
158: v = (View) c2.receive(1);
159: members = v.getMembers();
160: assertEquals(2, members.size());
161: assertTrue(members.contains(a2));
162:
163: c1.close();
164: Util.sleep(500);
165:
166: System.out.println("queue of c2 is " + c2.dumpQueue());
167: assertTrue("found 0 messages in channel",
168: c2.getNumMessages() > 0);
169: obj = c2.receive(0);
170: assertTrue(obj instanceof View);
171: v = (View) obj;
172: members = v.getMembers();
173: assertEquals(1, members.size());
174: assertFalse(members.contains(a1));
175: assertTrue(members.contains(a2));
176:
177: assertEquals(0, c2.getNumMessages());
178: }
179:
180: private void dumpMessages(String msg, JChannel ch) throws Exception {
181: while (ch.getNumMessages() > 0) {
182: Object obj = ch.receive(0);
183: if (obj instanceof View)
184: System.out.println(msg + ": " + obj);
185: }
186: }
187:
188: public void testConnectDisconnectConnectCloseSequence()
189: throws ChannelException {
190: System.out.println("-- creating channel --");
191: channel = new JChannel(props);
192: System.out.println("-- connecting channel to CloseTest1--");
193: channel.connect("CloseTest1");
194: System.out.println("view is " + channel.getView());
195: System.out.println("-- disconnecting channel --");
196: channel.disconnect();
197: System.out.println("-- connecting channel to OtherGroup --");
198: channel.connect("OtherGroup");
199: System.out.println("view is " + channel.getView());
200: System.out.println("-- closing channel --");
201: channel.close();
202: System.out.println("-- done, threads are ");
203: Util.printThreads();
204: }
205:
206: public void testConnectCloseSequenceWith2Members()
207: throws ChannelException {
208: System.out.println("-- creating channel --");
209: channel = new JChannel(props);
210: System.out.println("-- connecting channel --");
211: channel.connect("X");
212: System.out.println("view is " + channel.getView());
213:
214: System.out.println("-- creating channel1 --");
215: channel1 = new JChannel(props);
216: System.out.println("-- connecting channel1 --");
217: channel1.connect("X");
218: System.out.println("view is " + channel1.getView());
219:
220: System.out.println("-- closing channel1 --");
221: channel1.close();
222:
223: Util.sleep(2000);
224: System.out.println("-- closing channel --");
225: channel.close();
226: }
227:
228: public void testCreationAndClose2() throws Exception {
229: System.out.println("-- creating channel2 --");
230: channel2 = new JChannel(props);
231: System.out.println("-- connecting channel2 --");
232: channel2.connect("CloseTest2");
233: System.out.println("-- closing channel --");
234: channel2.close();
235: Util.sleep(2000);
236: Util.printThreads();
237: }
238:
239: public void testChannelClosedException() throws Exception {
240: System.out.println("-- creating channel --");
241: channel = new JChannel(props);
242: System.out.println("-- connecting channel --");
243: channel.connect("CloseTestLoop");
244: System.out.println("-- closing channel --");
245: channel.close();
246: Util.sleep(2000);
247:
248: try {
249: channel.connect("newGroup");
250: fail(); // cannot connect to a closed channel
251: } catch (ChannelClosedException ex) {
252: assertTrue(true);
253: }
254: }
255:
256: public void testCreationAndCloseLoop() throws Exception {
257: System.out.println("-- creating channel --");
258: channel = new JChannel(props);
259:
260: for (int i = 1; i <= 10; i++) {
261: System.out.println("-- connecting channel (attempt #" + i
262: + " ) --");
263: channel.connect("CloseTestLoop2");
264: System.out.println("-- closing channel --");
265: channel.close();
266:
267: System.out.println("-- reopening channel --");
268: channel.open();
269: }
270: channel.close();
271: }
272:
273: public void testShutdown() throws Exception {
274: System.out.println("-- creating channel --");
275: channel = new JChannel(props);
276: System.out.println("-- connecting channel --");
277: channel.connect("bla");
278: System.out.println("-- shutting down channel --");
279: channel.shutdown();
280:
281: Thread threads[] = new Thread[Thread.activeCount()];
282: Thread.enumerate(threads);
283: System.out.println("-- active threads:");
284: for (int i = 0; i < threads.length; i++)
285: System.out.println(threads[i]);
286: assertTrue(threads.length < 5);
287: }
288:
289: public void testMultipleConnectsAndDisconnects() throws Exception {
290: c1 = new JChannel(props);
291: assertTrue(c1.isOpen());
292: assertFalse(c1.isConnected());
293: c1.connect("bla");
294: assertTrue(c1.isOpen());
295: assertTrue(c1.isConnected());
296: assertServiceAndClusterView(c1, 1);
297:
298: c2 = new JChannel(props);
299: assertTrue(c2.isOpen());
300: assertFalse(c2.isConnected());
301:
302: c2.connect("bla");
303: assertTrue(c2.isOpen());
304: assertTrue(c2.isConnected());
305: assertServiceAndClusterView(c2, 2);
306: Util.sleep(500);
307: assertServiceAndClusterView(c1, 2);
308:
309: c2.disconnect();
310: assertTrue(c2.isOpen());
311: assertFalse(c2.isConnected());
312: Util.sleep(500);
313: assertServiceAndClusterView(c1, 1);
314:
315: c2.connect("bla");
316: assertTrue(c2.isOpen());
317: assertTrue(c2.isConnected());
318: assertServiceAndClusterView(c2, 2);
319: Util.sleep(300);
320: assertServiceAndClusterView(c1, 2);
321:
322: // Now see what happens if we reconnect the first channel
323: c3 = new JChannel(props);
324: assertTrue(c3.isOpen());
325: assertFalse(c3.isConnected());
326: assertServiceAndClusterView(c1, 2);
327: assertServiceAndClusterView(c2, 2);
328:
329: c1.disconnect();
330: assertTrue(c1.isOpen());
331: assertFalse(c1.isConnected());
332: assertServiceAndClusterView(c2, 1);
333: assertTrue(c3.isOpen());
334: assertFalse(c3.isConnected());
335:
336: c1.connect("bla");
337: assertTrue(c1.isOpen());
338: assertTrue(c1.isConnected());
339: assertServiceAndClusterView(c1, 2);
340: Util.sleep(500);
341: assertServiceAndClusterView(c2, 2);
342: assertTrue(c3.isOpen());
343: assertFalse(c3.isConnected());
344: }
345:
346: private void assertServiceAndClusterView(Channel ch, int num) {
347: View view = ch.getView();
348: String msg = "view=" + view;
349: assertNotNull(view);
350: assertEquals(msg, num, view.size());
351: }
352:
353: /* CAUTION: JDK 5 specific code */
354:
355: private String dumpThreads() {
356: StringBuffer sb = new StringBuffer();
357: ThreadMXBean bean = ManagementFactory.getThreadMXBean();
358: long[] ids = bean.getAllThreadIds();
359: ThreadInfo[] threads = bean.getThreadInfo(ids, 20);
360: for (int i = 0; i < threads.length; i++) {
361: ThreadInfo info = threads[i];
362: if (info == null)
363: continue;
364: sb.append(info.getThreadName()).append(":\n");
365: StackTraceElement[] stack_trace = info.getStackTrace();
366: for (int j = 0; j < stack_trace.length; j++) {
367: StackTraceElement el = stack_trace[j];
368: sb.append("at ").append(el.getClassName()).append(".")
369: .append(el.getMethodName());
370: sb.append("(").append(el.getFileName()).append(":")
371: .append(el.getLineNumber()).append(")");
372: sb.append("\n");
373: }
374: sb.append("\n\n");
375: }
376: return sb.toString();
377: }
378:
379: public static void main(String[] args) {
380: String[] testCaseName = { CloseTest.class.getName() };
381: junit.textui.TestRunner.main(testCaseName);
382: }
383:
384: }
|