001: package org.jgroups.tests;
002:
003: import junit.framework.TestCase;
004: import org.apache.commons.logging.Log;
005: import org.apache.commons.logging.LogFactory;
006: import org.jgroups.Header;
007: import org.jgroups.JChannel;
008: import org.jgroups.Message;
009: import org.jgroups.MessageListener;
010: import org.jgroups.blocks.PullPushAdapter;
011: import org.jgroups.debug.Debugger;
012:
013: import java.io.IOException;
014: import java.io.ObjectInput;
015: import java.io.ObjectOutput;
016: import java.util.Iterator;
017: import java.util.Vector;
018:
019: /**
020: * This test case checks ordering of messages using the Encrypt protocol
021: * using <code>UDP</code>protocols. It can be run
022: * as JUnit test case or in the command line. Parameters are:
023: * <ul>
024: * <li><code>-sleep n</code> - means that after each message sending, sender
025: * thread will sleep for <code>n</code> milliseconds;
026: * <li><code>-msg_num n</code> - <code>n</code> is number of messages to send;
027: * <li><code>-debug</code> - pop-up protocol debugger;
028: * <li><code>-cummulative</code> - debugger shows cummulative messages.
029: * </ul>
030: * $Id: EncryptMessageOrderTestCase.java,v 1.1.14.1 2007/04/26 21:32:06 vlada Exp $
031: */
032: public class EncryptMessageOrderTestCase extends TestCase {
033:
034: public static boolean USE_DEBUGGER = false;
035:
036: public static boolean CUMMULATIVE = false;
037:
038: public static int MESSAGE_NUMBER = 5 * 100;
039:
040: public static boolean SLEEP_BETWEEN_SENDING = false;
041:
042: public static int SLEEP_TIME = 1;
043:
044: String groupName = "ENCRYPT_ORDER_TEST";
045:
046: boolean orderCounterFailure = false;
047:
048: protected Log log = LogFactory.getLog(this .getClass());
049:
050: public static final String properties = "EncryptNoKeyStore.xml";
051:
052: /**
053: * Constructor to create test case.
054: */
055: public EncryptMessageOrderTestCase(String string) {
056: super (string);
057: }
058:
059: protected JChannel channel1;
060: protected PullPushAdapter adapter1;
061: protected Debugger debugger1;
062:
063: protected JChannel channel2;
064: protected PullPushAdapter adapter2;
065: protected Debugger debugger2;
066:
067: /**
068: * Print selected options before test starts.
069: */
070: protected static void printSelectedOptions() {
071: System.out.println("will sleep : " + SLEEP_BETWEEN_SENDING);
072: if (SLEEP_BETWEEN_SENDING)
073: System.out.println("sleep time : " + SLEEP_TIME);
074:
075: System.out.println("msg num : " + MESSAGE_NUMBER);
076:
077: }
078:
079: /**
080: * Set up unit test. It might add protocol
081: * stack debuggers if such option was selected at startup.
082: */
083: protected void setUp() throws Exception {
084: super .setUp();
085: printSelectedOptions();
086:
087: channel1 = new JChannel(properties);
088: System.out.print("Connecting to channel...");
089: channel1.connect(groupName);
090: System.out.println("channel1 connected, view is "
091: + channel1.getView());
092:
093: adapter1 = new PullPushAdapter(channel1);
094:
095: if (USE_DEBUGGER) {
096: debugger1 = new Debugger(channel1, CUMMULATIVE, "channel 1");
097: debugger1.start();
098: }
099:
100: // sleep one second before second member joins
101: try {
102: Thread.sleep(1000);
103: } catch (InterruptedException ex) {
104: }
105:
106: channel2 = new JChannel(properties);
107: channel2.connect(groupName);
108: System.out.println("channel2 connected, view is "
109: + channel2.getView());
110:
111: adapter2 = new PullPushAdapter(channel2);
112:
113: if (USE_DEBUGGER) {
114: debugger2 = new Debugger(channel2, CUMMULATIVE, "channel 2");
115: debugger2.start();
116: }
117:
118: // sleep one second before processing continues
119: try {
120: Thread.sleep(1000);
121: } catch (InterruptedException ex) {
122: }
123:
124: }
125:
126: /**
127: * Tears down test case. This method closes all opened channels.
128: */
129: protected void tearDown() throws Exception {
130: super .tearDown();
131:
132: adapter2.stop();
133: channel2.close();
134:
135: adapter1.stop();
136: channel1.close();
137: }
138:
139: protected boolean finishedReceiving;
140:
141: /**
142: * Test method. This method adds a message listener to the PullPushAdapter
143: * on channel 1, and starts sending specified number of messages into
144: * channel 1 or 2 depending if we are in loopback mode or not. Each message
145: * containg timestamp when it was created. By measuring time on message
146: * delivery we can calculate message trip time. Listener is controlled by
147: * two string messages "start" and "stop". After sender has finished to
148: * send messages, it waits until listener receives all messages or "stop"
149: * message. Then test is finished and calculations are showed.
150: * <p/>
151: * Also we calculate how much memory
152: * was allocated before excuting a test and after executing a test.
153: */
154: public void testLoad() {
155: try {
156: final String startMessage = "start";
157: final String stopMessage = "stop";
158:
159: final Object mutex = new Object();
160:
161: final Vector receivedTimes = new Vector(MESSAGE_NUMBER);
162: final Vector normalMessages = new Vector(MESSAGE_NUMBER);
163: final Vector tooQuickMessages = new Vector();
164: final Vector tooSlowMessages = new Vector();
165:
166: if (USE_DEBUGGER) {
167: System.out.println("Press any key to continue...");
168: try {
169: System.in.read();
170: } catch (java.io.IOException ioex) {
171: }
172: }
173:
174: adapter1.setListener(new MessageListener() {
175: private boolean started = false;
176: private boolean stopped = false;
177:
178: private long counter = 0L;
179:
180: public byte[] getState() {
181: return null;
182: }
183:
184: public void setState(byte[] state) {
185: }
186:
187: public void receive(Message jgMessage) {
188: Object message = jgMessage.getObject();
189:
190: if (startMessage.equals(message)) {
191: started = true;
192: finishedReceiving = false;
193: } else if (stopMessage.equals(message)) {
194: stopped = true;
195: finishedReceiving = true;
196:
197: synchronized (mutex) {
198: mutex.notifyAll();
199: }
200:
201: } else if (message instanceof Long) {
202: Long travelTime = new Long(System
203: .currentTimeMillis()
204: - ((Long) message).longValue());
205:
206: try {
207: assertEquals(
208: counter,
209: ((EncryptOrderTestHeader) ((Message) jgMessage)
210: .getHeader("EncryptOrderTest")).seqno);
211: counter++;
212: } catch (Exception e) {
213: log.warn(e);
214: orderCounterFailure = true;
215: }
216: if (!started)
217: tooQuickMessages.add(message);
218: else if (started && !stopped) {
219: receivedTimes.add(travelTime);
220: normalMessages.add(message);
221: } else
222: tooSlowMessages.add(message);
223: }
224: }
225: });
226:
227: System.out.println("Free memory: "
228: + Runtime.getRuntime().freeMemory());
229: System.out.println("Total memory: "
230: + Runtime.getRuntime().totalMemory());
231: System.out.println("Starting sending messages.");
232:
233: long time = System.currentTimeMillis();
234:
235: Message startJgMessage = new Message();
236: startJgMessage.setObject(startMessage);
237:
238: JChannel sender = channel2;
239:
240: sender.send(startJgMessage);
241:
242: for (int i = 0; i < MESSAGE_NUMBER; i++) {
243: Long message = new Long(System.currentTimeMillis());
244:
245: Message jgMessage = new Message();
246: jgMessage.putHeader("EncryptOrderTest",
247: new EncryptOrderTestHeader(i));
248: jgMessage.setObject(message);
249:
250: sender.send(jgMessage);
251:
252: if (i % 1000 == 0)
253: System.out.println("sent " + i + " messages.");
254:
255: if (SLEEP_BETWEEN_SENDING)
256: org.jgroups.util.Util.sleep(1, true);
257: }
258:
259: Message stopJgMessage = new Message();
260: stopJgMessage.setObject(stopMessage);
261: sender.send(stopJgMessage);
262:
263: time = System.currentTimeMillis() - time;
264:
265: System.out
266: .println("Finished sending messages. Operation took "
267: + time);
268:
269: synchronized (mutex) {
270:
271: int received = 0;
272:
273: while (!finishedReceiving) {
274: mutex.wait(1000);
275:
276: if (receivedTimes.size() != received) {
277: received = receivedTimes.size();
278:
279: System.out.println();
280: System.out.print("Received "
281: + receivedTimes.size() + " messages.");
282: } else {
283: System.out.print(".");
284: }
285: }
286: }
287:
288: try {
289: Thread.sleep(1000);
290: } catch (Exception ex) {
291: }
292:
293: double avgDeliveryTime = -1.0;
294: long maxDeliveryTime = Long.MIN_VALUE;
295: long minDeliveryTime = Long.MAX_VALUE;
296:
297: Iterator iterator = receivedTimes.iterator();
298: while (iterator.hasNext()) {
299: Long message = (Long) iterator.next();
300:
301: if (avgDeliveryTime == -1.0)
302: avgDeliveryTime = message.longValue();
303: else
304: avgDeliveryTime = (avgDeliveryTime + message
305: .doubleValue()) / 2.0;
306:
307: if (message.longValue() > maxDeliveryTime)
308: maxDeliveryTime = message.longValue();
309:
310: if (message.longValue() < minDeliveryTime)
311: minDeliveryTime = message.longValue();
312: }
313:
314: System.out.println("Sent " + MESSAGE_NUMBER + " messages.");
315: System.out.println("Received " + receivedTimes.size()
316: + " messages.");
317: System.out.println("Average delivery time "
318: + avgDeliveryTime + " ms");
319: System.out.println("Minimum delivery time "
320: + minDeliveryTime + " ms");
321: System.out.println("Maximum delivery time "
322: + maxDeliveryTime + " ms");
323: System.out.println("Received " + tooQuickMessages.size()
324: + " too quick messages");
325: System.out.println("Received " + tooSlowMessages.size()
326: + " too slow messages");
327: } catch (Exception ex) {
328: ex.printStackTrace();
329: }
330:
331: System.out.println("Free memory: "
332: + Runtime.getRuntime().freeMemory());
333: System.out.println("Total memory: "
334: + Runtime.getRuntime().totalMemory());
335:
336: System.out.println("Performing GC");
337:
338: Runtime.getRuntime().gc();
339:
340: try {
341: Thread.sleep(2000);
342: } catch (InterruptedException ex) {
343: }
344:
345: System.out.println("Free memory: "
346: + Runtime.getRuntime().freeMemory());
347: System.out.println("Total memory: "
348: + Runtime.getRuntime().totalMemory());
349:
350: if (USE_DEBUGGER) {
351: System.out.println("Press any key to finish...");
352: try {
353: System.in.read();
354: } catch (java.io.IOException ioex) {
355: }
356: }
357: assertTrue("Message ordering is incorrect - check log output",
358: (!orderCounterFailure));
359: }
360:
361: public static class EncryptOrderTestHeader extends Header {
362:
363: long seqno = -1; // either reg. NAK_ACK_MSG or first_seqno in retransmissions
364:
365: public EncryptOrderTestHeader() {
366: }
367:
368: public EncryptOrderTestHeader(long seqno) {
369:
370: this .seqno = seqno;
371: }
372:
373: public long size() {
374: return 512;
375: }
376:
377: public void writeExternal(ObjectOutput out) throws IOException {
378:
379: out.writeLong(seqno);
380: }
381:
382: public void readExternal(ObjectInput in) throws IOException,
383: ClassNotFoundException {
384:
385: seqno = in.readLong();
386:
387: }
388:
389: public EncryptOrderTestHeader copy() {
390: EncryptOrderTestHeader ret = new EncryptOrderTestHeader(
391: seqno);
392: return ret;
393: }
394:
395: public String toString() {
396: StringBuffer ret = new StringBuffer();
397: ret.append("[ENCRYPT_ORDER_TEST: seqno=" + seqno);
398: ret.append(']');
399:
400: return ret.toString();
401: }
402:
403: }
404:
405: /**
406: * Main method to start a test case from the command line. Parameters are:
407: * <ul>
408: * <li><code>-sleep n</code> - means that after each message sending, sender
409: * thread will sleep for <code>n</code> milliseconds;
410: * <li><code>-msg_num n</code> - <code>n</code> is number of messages to send;;
411: * <li><code>-debug</code> - pop-up protocol debugger;
412: * <li><code>-cummulative</code> - debugger shows cummulative messages.
413: * </ul>
414: */
415: public static void main(String[] args) {
416: for (int i = 0; i < args.length; i++) {
417: if ("-sleep".equals(args[i])) {
418: SLEEP_BETWEEN_SENDING = true;
419: if (!(i < args.length - 1))
420: throw new RuntimeException(
421: "You have to specify sleep time");
422:
423: try {
424: SLEEP_TIME = Integer.parseInt(args[++i]);
425: } catch (NumberFormatException nfex) {
426: throw new RuntimeException(
427: "Cannot parse sleep time");
428: }
429:
430: continue;
431: } else if ("-msg_num".equals(args[i])) {
432: if (!(i < args.length - 1))
433: throw new RuntimeException(
434: "You have to specify messages number");
435:
436: try {
437: MESSAGE_NUMBER = Integer.parseInt(args[++i]);
438: } catch (NumberFormatException nfex) {
439: throw new RuntimeException(
440: "Cannot parse messages number");
441: }
442:
443: continue;
444: }
445:
446: else if ("-debug".equals(args[i])) {
447: USE_DEBUGGER = true;
448:
449: continue;
450: } else if ("-cummulative".equals(args[i])) {
451: CUMMULATIVE = true;
452: continue;
453: } else if ("-help".equals(args[i])) {
454: help();
455: return;
456: }
457: }
458:
459: junit.textui.TestRunner.run(EncryptMessageOrderTestCase.class);
460: }
461:
462: static void help() {
463: System.out
464: .println("EncryptOrderTest [-help] [-sleep <sleep time between sends (ms)>] "
465: + " [-msg_num <number of msgs to send>] [-debug [-cummulative]]");
466: }
467:
468: }
|