001: package org.jgroups.tests;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.IpAddress;
005: import org.jgroups.stack.Protocol;
006: import org.jgroups.stack.ProtocolObserver;
007: import org.jgroups.util.Util;
008:
009: import java.io.BufferedReader;
010: import java.io.File;
011: import java.io.FileWriter;
012: import java.io.InputStreamReader;
013: import java.util.Date;
014:
015: /**
016: * <h1>ContinousThroughputTest.java</h1>
017: * <p/>
018: * This is a program to make Throughput tests.
019: * <p/>
020: * The program assumes to run on a reliable network where no partitioning or failures happen (Apart for cping test).
021: * Once you run the program it connects the channel and gives you a prompt.
022: * Every time a new view is received you will see it printed.
023: * Once you have launched the program on all the machine you use for the test just digit
024: * on one machine the command for the test you desire to make, you will be asked for the necessary parameters,
025: * then the test starts.
026: * Depending on the chosen test you will see its results on the monitor and them ar logged
027: * on a file on the working dir called <code>"ContinousThroughputTest<hostname><systemTimeInSeconds>.log"</code> .
028: *
029: * @author Gianluca Collot
030: * @version 1.0
031: */
032:
033: public class ContinousThroughputTest {
034: String props = "UDP:" + "PING(up_thread=false;down_thread=false):"
035: + "FD(timeout=1000;shun=false):"
036: + "STABLE(up_thread=false;down_thread=false):"
037: + "MERGE(up_thread=false;down_thread=false):" + "NAKACK:"
038: + "FLUSH:" + "GMS:"
039: + "VIEW_ENFORCER(up_thread=false;down_thread=false):" +
040: // "TSTAU:" +
041: "QUEUE(up_thread=false;down_thread=false)";
042: // String props= "TCP:TCPPING(initial_hosts=manolete2[8880]):FD(timeout=10000):" +
043: // "STABLE:MERGE:NAKACK:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE";
044: JChannel channel = null;
045: Thread sendThread, receiveThread;
046: boolean coordinator = false;
047: IpAddress my_addr = null;
048: View view;
049: BufferedReader reader;
050: float troughputSum = 0, meanTroughput = 0, minTroughput = 10000,
051: maxTroughput = 0;
052: int numTests = 0;
053: FileWriter logWriter;
054: Protocol prot = null;
055:
056: /**
057: * Creates threads, creates and connects channel opens log file
058: */
059:
060: public ContinousThroughputTest() {
061: sendThread = new Thread("sendThread") {
062: public void run() {
063: parser();
064: }
065: };
066: receiveThread = new Thread("receiveThread") {
067: public void run() {
068: checkChannel();
069: }
070: };
071: reader = new BufferedReader(new InputStreamReader(System.in));
072: try {
073: channel = new JChannel(props);
074: // prot = (Protocol) channel.getProtocolStack().getProtocols().lastElement();
075: // prot.setObserver(new ContinousThroughputTest.MessageLenghtObserver());
076: channel.setOpt(Channel.BLOCK, Boolean.FALSE);
077: channel.connect("Janus");
078: } catch (Exception ex) {
079: System.out.println("Connection Failed!" + ex);
080: System.exit(1);
081: }
082: my_addr = (IpAddress) channel.getLocalAddress();
083:
084: try {
085: File log = new File("ContinousThroughputTest"
086: + my_addr.getIpAddress().getHostName()
087: + (System.currentTimeMillis() / 10000) + ".log");
088: if (!log.exists()) {
089: log.createNewFile();
090: }
091: logWriter = new FileWriter(log);
092: logWriter.write("ContinousThroughputTest.java log\r\n");
093: logWriter.write("Date:"
094: + new Date(System.currentTimeMillis()) + "\r\n");
095: log("Protocol Stack is " + props);
096: System.out.println("Protocol Stack is " + props);
097: } catch (Exception ex) {
098: System.out.println("File problems " + ex);
099: System.exit(5);
100: }
101: }
102:
103: public static void main(String[] args) {
104: ContinousThroughputTest perfTest = new ContinousThroughputTest();
105: perfTest.go();
106: }
107:
108: void go() {
109: // Starts Receiving
110: receiveThread.start();
111: // Starts input Parser
112: sendThread.start();
113: }
114:
115: /**
116: * This function should be called in its own thread.
117: * It recives messages and calculates the troughput
118: */
119:
120: public void checkChannel() {
121: String payload = null;
122: Object received = null;
123: Message msg = null;
124: boolean done = false;
125: long n;
126: int i = 1;
127:
128: System.out.println("Started receiving");
129: try {
130: while (!done) {
131: received = channel.receive(0);
132: if (received instanceof Message) {
133: msg = (Message) received;
134: payload = (String) msg.getObject();
135: System.out.println(payload);
136: if ("stop".equalsIgnoreCase(payload)) {
137: done = true;
138: }
139: if ("pingpong".equalsIgnoreCase(payload)) {
140: n = ((Long) ((Message) channel.receive(0))
141: .getObject()).longValue();
142: i = ((Integer) ((Message) channel.receive(0))
143: .getObject()).intValue();
144: log("Starting pingpong test. Rounds: " + n
145: + " Bursts: " + i);
146: pingpongTest(n, i, false);
147: }
148: if ("cping".equalsIgnoreCase(payload)) {
149: // i = ((Integer) ((Message) channel.receive(0)).getObject()).intValue();
150: log("Starting cping test. Bursts: " + 1);
151: cpingTest(1, true);
152: }
153: if ("sweep".equalsIgnoreCase(payload)) {
154: n = ((Long) ((Message) channel.receive(0))
155: .getObject()).longValue();
156: i = ((Integer) ((Message) channel.receive(0))
157: .getObject()).intValue();
158: log("Starting sweep test. Rounds: " + n
159: + " initial burst: " + i);
160: sweep(n, i);
161: }
162: }
163: if (received instanceof View) {
164: view = (View) received;
165: System.out.println(view);
166: if (view.getMembers().elementAt(0).equals(my_addr)) {
167: System.out.println("I'm the new Coordinator");
168: coordinator = true;
169: }
170: resetData();
171: }
172: }
173: } catch (Exception ex) {
174: System.out.println("checkChannel() :" + ex);
175: try {
176: logWriter.write("Stopped cause " + ex + "\r\n");
177: } catch (Exception e) {
178: }
179: System.exit(2);
180: }
181: System.out.println("Stopped Receiving");
182:
183: channel.disconnect();
184: System.out.println("Disconnected from \"Janus\"");
185: channel.close();
186: System.out.println("Channel Closed");
187: System.exit(0);
188: }
189:
190: /**
191: * This function should be run in its own thread and sends messages on an already connected channel
192: */
193: public void parser() {
194: boolean done = false;
195: String input;
196: int number = 0;
197: int burstlength = 1;
198:
199: System.out.println("Ready.");
200: try {
201: while (!done) {
202: input = reader.readLine();
203: if ("stop".equalsIgnoreCase(input)) {
204: done = true;
205: }
206: if ("pingpong".equalsIgnoreCase(input)) {
207: number = askNumber(reader, "How many rounds?");
208: burstlength = askNumber(reader, "Length of bursts?");
209: channel.send(new Message(null, null, input));
210: channel.send(new Message(null, null, new Long(
211: number)));
212: channel.send(new Message(null, null, new Integer(
213: burstlength)));
214: continue;
215:
216: }
217: if ("cping".equalsIgnoreCase(input)) {
218: // burstlength = askNumber( reader,"Length of bursts?");
219: channel.send(new Message(null, null, input));
220: // channel.send(new Message(null,null,new Integer(burstlength)));
221: continue;
222: }
223: if ("sweep".equalsIgnoreCase(input)) {
224: number = askNumber(reader, "Number of tests");
225: burstlength = askNumber(reader,
226: "Initial length of bursts?");
227: channel.send(new Message(null, null, input));
228: channel.send(new Message(null, null, new Long(
229: number)));
230: channel.send(new Message(null, null, new Integer(
231: burstlength)));
232: continue;
233: }
234: channel.send(new Message(null, null, input));
235: }
236: } catch (Exception ex) {
237: System.out.println(ex);
238: }
239: }
240:
241: /**
242: * sendBurst(int n): sends a burst of messages with small payload
243: */
244:
245: void sendBurst(long n) {
246: try {
247: byte[] buf = Util.objectToByteBuffer("Standard Mex");
248: for (int i = 0; i < n; i++) {
249: channel.send(new Message(null, null, buf));
250: }
251: } catch (Exception ex) {
252: System.out.println("sendBurst: " + ex);
253: }
254: }
255:
256: /**
257: * showStats: Prints resulting times and troughput
258: */
259:
260: void showStats(long start, long stop, long messages, int burstlength) {
261: String result;
262: long elapsedTime = (stop - start);
263: long troughPut = (messages * 1000) / elapsedTime;
264: // troughputSum += troughPut;
265: maxTroughput = (maxTroughput > troughPut) ? maxTroughput
266: : troughPut;
267: minTroughput = (minTroughput < troughPut) ? minTroughput
268: : troughPut;
269: // System.out.println("Elapsed Time: " + (stop-start) + " milliseconds to receive " + messages + " messages");
270: result = "Elapsed Time: " + (stop - start) + "| messages:"
271: + messages + "| burst length:" + burstlength
272: + "| Troughput:" + troughPut + "| max: " + maxTroughput
273: + "| min: " + minTroughput + "\r\n";
274: System.out.println(result);
275: try {
276: logWriter.write(result);
277: logWriter.flush();
278: } catch (Exception ex) {
279: System.out.println("showStats():" + ex);
280: }
281:
282: }
283:
284: int askNumber(BufferedReader reader, String text) {
285: int number = 0;
286: String input = "10";
287: System.out.println(text);
288: try {
289: input = reader.readLine();
290: } catch (Exception ex) {
291: System.out.println("AskNumber :" + ex);
292: }
293:
294: number = Integer.parseInt(input);
295: return number;
296: }
297:
298: /**
299: * Resets stored statistics and counters
300: */
301:
302: void resetData() {
303: maxTroughput = 0;
304: minTroughput = 10000;
305: meanTroughput = 0;
306: numTests = 0;
307: troughputSum = 0;
308: }
309:
310: /**
311: * Make a pingpong test:
312: * For n times a message is sent and view.size() messages are received
313: * Every 1000 messages sent the throughput is evaluated or at the end of the test
314: */
315: void pingpongTest(long n, int burst_length,
316: boolean partialResultsPrint) {
317: long i = 0;
318: long start = System.currentTimeMillis();
319: long tempstart = System.currentTimeMillis();
320: long stop, throughput;
321: try {
322: for (i = 0; i < n; i++) {
323: for (int k = 0; k < burst_length; k++)
324: channel.send(new Message(null, null, new Long(i)));
325: for (int j = 0; j < (view.size() * burst_length); j++) {
326: channel.receive(20000);
327: }
328: if (partialResultsPrint && ((i % 1000) == 0)) {
329: if (i == 0)
330: continue;
331: stop = System.currentTimeMillis();
332: throughput = (1000000 / (stop - tempstart))
333: * view.size() * burst_length;
334: try {
335: System.out.println(new Date(stop).toString()
336: + " : " + throughput);
337: logWriter.write(new Date(stop).toString()
338: + " : " + throughput);
339: logWriter.write("\r\n");
340: logWriter.flush();
341: tempstart = System.currentTimeMillis();
342: } catch (Exception ex) {
343: ex.printStackTrace();
344: }
345: }
346: }
347: } catch (TimeoutException ex) {
348: System.out.println("Timeout Receiving, round: " + i);
349: System.exit(5);
350: } catch (Exception ex) {
351: ex.printStackTrace();
352: System.exit(4);
353: }
354: stop = System.currentTimeMillis();
355: showStats(start, stop, n * view.size() * burst_length,
356: burst_length);
357: }
358:
359: void sweep(long tests, int burstlenght) {
360: long messagespertest = 10000;
361: for (int i = 0; i < tests; i++) {
362: burstlenght += i;
363: pingpongTest(messagespertest / burstlenght, burstlenght,
364: false);
365: }
366: }
367:
368: /**
369: * Makes a continous test handling view changes
370: */
371: void cpingTest(int burst_lenght, boolean printoutput) {
372: Object recvd = null;
373: long start = System.currentTimeMillis();
374: for (long i = 1; i < Long.MAX_VALUE; i++) {
375: // System.out.println("Round: " + i);
376: try {
377: channel.send(null, null, "cping");
378: for (int j = 0; j < burst_lenght * view.size();) {
379: recvd = channel.receive(10000);
380: if (recvd instanceof View) {
381: view = (View) recvd;
382: System.out.println(view);
383: log(view.toString());
384: } else {
385: j++;
386: }
387: }
388: } catch (TimeoutException tex) {
389: try {
390: channel.send(new Message(null, null, "cping"));
391: System.out.println("Resent a message for timeout");
392: log("Resent a message for timeout");
393: } catch (Exception ex) {
394: System.exit(9);
395: }
396: } catch (Exception ex) {
397: System.exit(9);
398: }
399: if ((i % 1000) == 0) {
400: long stop = System.currentTimeMillis();
401: long throughput = i * 1000 * view.size()
402: / (stop - start);
403: System.out.println("Througputh = " + throughput);
404: log("Througputh = " + throughput);
405: start = System.currentTimeMillis();
406: i = 0;
407: }
408: }
409: }
410:
411: /**
412: * Used to print messages lenght and their serialized contents.
413: */
414:
415: public static class MessageLenghtObserver implements
416: ProtocolObserver {
417:
418: public void setProtocol(Protocol prot) {
419: /** todo: Implement this org.jgroups.debug.ProtocolObserver method*/
420: throw new java.lang.UnsupportedOperationException(
421: "Method setProtocol() not yet implemented.");
422: }
423:
424: public boolean up(Event evt, int num_evts) {
425: /** todo: Implement this org.jgroups.debug.ProtocolObserver method*/
426: throw new java.lang.UnsupportedOperationException(
427: "Method up() not yet implemented.");
428: }
429:
430: public boolean passUp(Event evt) {
431: return true;
432: }
433:
434: public boolean down(Event evt, int num_evts) {
435: return true;
436: }
437:
438: public boolean passDown(Event evt) {
439: byte[] buf = null;
440: if (evt.getType() == Event.MSG)
441: try {
442: buf = Util.objectToByteBuffer(evt.getArg());
443: System.out.println("UDP: sending a message of "
444: + buf.length + "bytes");
445: System.out.println("Message was :");
446: System.out.println(new String(buf));
447: } catch (Exception ex) {
448:
449: }
450: return true;
451: }
452: }
453:
454: void log(String str) {
455: try {
456: logWriter.write(str + "\r\n");
457: logWriter.flush();
458: } catch (Exception ex) {
459:
460: }
461: }
462: }
|