001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.demos;
018:
019: import java.io.Serializable;
020: import java.util.Random;
021:
022: import org.apache.catalina.tribes.ByteMessage;
023: import org.apache.catalina.tribes.ChannelException;
024: import org.apache.catalina.tribes.ChannelListener;
025: import org.apache.catalina.tribes.ManagedChannel;
026: import org.apache.catalina.tribes.Member;
027: import org.apache.catalina.tribes.MembershipListener;
028: import org.apache.catalina.tribes.io.XByteBuffer;
029: import org.apache.catalina.tribes.Channel;
030: import java.io.Externalizable;
031: import org.apache.juli.logging.Log;
032: import org.apache.juli.logging.LogFactory;
033:
034: /**
035: * <p>Title: </p>
036: *
037: * <p>Description: </p>
038: *
039: * <p>Company: </p>
040: *
041: * @author not attributable
042: * @version 1.0
043: */
044: public class LoadTest implements MembershipListener, ChannelListener,
045: Runnable {
046: protected static Log log = LogFactory.getLog(LoadTest.class);
047: public static int size = 24000;
048: public static Object mutex = new Object();
049: public boolean doRun = true;
050:
051: public long bytesReceived = 0;
052: public float mBytesReceived = 0;
053: public int messagesReceived = 0;
054: public boolean send = true;
055: public boolean debug = false;
056: public int msgCount = 100;
057: ManagedChannel channel = null;
058: public int statsInterval = 10000;
059: public long pause = 0;
060: public boolean breakonChannelException = false;
061: public boolean async = false;
062: public long receiveStart = 0;
063: public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
064:
065: static int messageSize = 0;
066:
067: public static long messagesSent = 0;
068: public static long messageStartSendTime = 0;
069: public static long messageEndSendTime = 0;
070: public static int threadCount = 0;
071:
072: public static synchronized void startTest() {
073: threadCount++;
074: if (messageStartSendTime == 0)
075: messageStartSendTime = System.currentTimeMillis();
076: }
077:
078: public static synchronized void endTest() {
079: threadCount--;
080: if (messageEndSendTime == 0 && threadCount == 0)
081: messageEndSendTime = System.currentTimeMillis();
082: }
083:
084: public static synchronized long addSendStats(long count) {
085: messagesSent += count;
086: return 0l;
087: }
088:
089: private static void printSendStats(long counter, int messageSize) {
090: float cnt = (float) counter;
091: float size = (float) messageSize;
092: float time = (float) (System.currentTimeMillis() - messageStartSendTime) / 1000f;
093: log.info("****SEND STATS-" + Thread.currentThread().getName()
094: + "*****" + "\n\tMessage count:" + counter
095: + "\n\tTotal bytes :" + (long) (size * cnt)
096: + "\n\tTotal seconds:" + (time) + "\n\tBytes/second :"
097: + (size * cnt / time) + "\n\tMBytes/second:"
098: + (size * cnt / time / 1024f / 1024f));
099: }
100:
101: public LoadTest(ManagedChannel channel, boolean send, int msgCount,
102: boolean debug, long pause, int stats, boolean breakOnEx) {
103: this .channel = channel;
104: this .send = send;
105: this .msgCount = msgCount;
106: this .debug = debug;
107: this .pause = pause;
108: this .statsInterval = stats;
109: this .breakonChannelException = breakOnEx;
110: }
111:
112: public void run() {
113:
114: long counter = 0;
115: long total = 0;
116: LoadMessage msg = new LoadMessage();
117: int messageSize = LoadTest.messageSize;
118:
119: try {
120: startTest();
121: while (total < msgCount) {
122: if (channel.getMembers().length == 0 || (!send)) {
123: synchronized (mutex) {
124: try {
125: mutex.wait();
126: } catch (InterruptedException x) {
127: log.info("Thread interrupted from wait");
128: }
129: }
130: } else {
131: try {
132: //msg.setMsgNr((int)++total);
133: counter++;
134: if (debug) {
135: printArray(msg.getMessage());
136: }
137: channel.send(channel.getMembers(), msg,
138: channelOptions);
139: if (pause > 0) {
140: if (debug)
141: System.out
142: .println("Pausing sender for "
143: + pause + " ms.");
144: Thread.sleep(pause);
145: }
146: } catch (ChannelException x) {
147: if (debug)
148: log.error("Unable to send message:"
149: + x.getMessage(), x);
150: log.error("Unable to send message:"
151: + x.getMessage());
152: ChannelException.FaultyMember[] faulty = x
153: .getFaultyMembers();
154: for (int i = 0; i < faulty.length; i++)
155: log.error("Faulty: " + faulty[i]);
156: --counter;
157: if (this .breakonChannelException)
158: throw x;
159: }
160: }
161: if ((counter % statsInterval) == 0 && (counter > 0)) {
162: //add to the global counter
163: counter = addSendStats(counter);
164: //print from the global counter
165: //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
166: printSendStats(LoadTest.messagesSent,
167: LoadTest.messageSize);
168:
169: }
170:
171: }
172: } catch (Exception x) {
173: log.error("Captured error while sending:" + x.getMessage());
174: if (debug)
175: log.error("", x);
176: printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
177: }
178: endTest();
179: }
180:
181: /**
182: * memberAdded
183: *
184: * @param member Member
185: * @todo Implement this org.apache.catalina.tribes.MembershipListener
186: * method
187: */
188: public void memberAdded(Member member) {
189: log.info("Member added:" + member);
190: synchronized (mutex) {
191: mutex.notifyAll();
192: }
193: }
194:
195: /**
196: * memberDisappeared
197: *
198: * @param member Member
199: * @todo Implement this org.apache.catalina.tribes.MembershipListener
200: * method
201: */
202: public void memberDisappeared(Member member) {
203: log.info("Member disappeared:" + member);
204: }
205:
206: public boolean accept(Serializable msg, Member mbr) {
207: return (msg instanceof LoadMessage)
208: || (msg instanceof ByteMessage);
209: }
210:
211: public void messageReceived(Serializable msg, Member mbr) {
212: if (receiveStart == 0)
213: receiveStart = System.currentTimeMillis();
214: if (debug) {
215: if (msg instanceof LoadMessage) {
216: printArray(((LoadMessage) msg).getMessage());
217: }
218: }
219:
220: if (msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
221: LoadMessage tmp = new LoadMessage();
222: tmp.setMessage(((ByteMessage) msg).getMessage());
223: msg = tmp;
224: tmp = null;
225: }
226:
227: bytesReceived += ((LoadMessage) msg).getMessage().length;
228: mBytesReceived += ((float) ((LoadMessage) msg).getMessage().length) / 1024f / 1024f;
229: messagesReceived++;
230: if ((messagesReceived % statsInterval) == 0
231: || (messagesReceived == msgCount)) {
232: float bytes = (float) (((LoadMessage) msg).getMessage().length * messagesReceived);
233: float seconds = ((float) (System.currentTimeMillis() - receiveStart)) / 1000f;
234: log.info("****RECEIVE STATS-"
235: + Thread.currentThread().getName() + "*****"
236: + "\n\tMessage count :" + (long) messagesReceived
237: + "\n\tMessage/sec :" + messagesReceived
238: / seconds + "\n\tTotal bytes :" + (long) bytes
239: + "\n\tTotal mbytes :" + (long) mBytesReceived
240: + "\n\tTime since 1st:" + seconds + " seconds"
241: + "\n\tBytes/second :" + (bytes / seconds)
242: + "\n\tMBytes/second :"
243: + (mBytesReceived / seconds) + "\n");
244:
245: }
246: }
247:
248: public static void printArray(byte[] data) {
249: System.out.print("{");
250: for (int i = 0; i < data.length; i++) {
251: System.out.print(data[i]);
252: System.out.print(",");
253: }
254: System.out.println("} size:" + data.length);
255: }
256:
257: //public static class LoadMessage implements Serializable {
258: public static class LoadMessage extends ByteMessage implements
259: Serializable {
260:
261: public static byte[] outdata = new byte[size];
262: public static Random r = new Random(System.currentTimeMillis());
263:
264: public static int getMessageSize(LoadMessage msg) {
265: int messageSize = msg.getMessage().length;
266: if (((Object) msg) instanceof ByteMessage)
267: return messageSize;
268: try {
269: messageSize = XByteBuffer.serialize(new LoadMessage()).length;
270: log.info("Average message size:" + messageSize
271: + " bytes");
272: } catch (Exception x) {
273: log.error("Unable to calculate test message size.", x);
274: }
275: return messageSize;
276: }
277:
278: static {
279: r.nextBytes(outdata);
280: }
281:
282: protected byte[] message = getMessage();
283:
284: public LoadMessage() {
285: }
286:
287: public byte[] getMessage() {
288: if (message == null) {
289: message = outdata;
290: }
291: return message;
292: }
293:
294: public void setMessage(byte[] data) {
295: this .message = data;
296: }
297: }
298:
299: public static void usage() {
300: System.out.println("Tribes Load tester.");
301: System.out
302: .println("The load tester can be used in sender or received mode or both");
303: System.out
304: .println("Usage:\n\t"
305: + "java LoadTest [options]\n\t"
306: + "Options:\n\t\t"
307: + "[-mode receive|send|both] \n\t\t"
308: + "[-startoptions startflags (default is Channel.DEFAULT) ] \n\t\t"
309: + "[-debug] \n\t\t"
310: + "[-count messagecount] \n\t\t"
311: + "[-stats statinterval] \n\t\t"
312: + "[-pause nrofsecondstopausebetweensends] \n\t\t"
313: + "[-threads numberofsenderthreads] \n\t\t"
314: + "[-size messagesize] \n\t\t"
315: + "[-sendoptions channeloptions] \n\t\t"
316: + "[-break (halts execution on exception)]\n"
317: + "[-shutdown (issues a channel.stop() command after send is completed)]\n"
318: + "\tChannel options:"
319: + ChannelCreator.usage()
320: + "\n\n"
321: + "Example:\n\t"
322: + "java LoadTest -port 4004\n\t"
323: + "java LoadTest -bind 192.168.0.45 -port 4005\n\t"
324: + "java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
325: }
326:
327: public static void main(String[] args) throws Exception {
328: boolean send = true;
329: boolean debug = false;
330: long pause = 0;
331: int count = 1000000;
332: int stats = 10000;
333: boolean breakOnEx = false;
334: int threads = 1;
335: boolean shutdown = false;
336: int startoptions = Channel.DEFAULT;
337: int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
338: if (args.length == 0) {
339: args = new String[] { "-help" };
340: }
341: for (int i = 0; i < args.length; i++) {
342: if ("-threads".equals(args[i])) {
343: threads = Integer.parseInt(args[++i]);
344: } else if ("-count".equals(args[i])) {
345: count = Integer.parseInt(args[++i]);
346: System.out.println("Sending " + count + " messages.");
347: } else if ("-pause".equals(args[i])) {
348: pause = Long.parseLong(args[++i]) * 1000;
349: } else if ("-break".equals(args[i])) {
350: breakOnEx = true;
351: } else if ("-shutdown".equals(args[i])) {
352: shutdown = true;
353: } else if ("-stats".equals(args[i])) {
354: stats = Integer.parseInt(args[++i]);
355: System.out.println("Stats every " + stats + " message");
356: } else if ("-sendoptions".equals(args[i])) {
357: channelOptions = Integer.parseInt(args[++i]);
358: System.out.println("Setting send options to "
359: + channelOptions);
360: } else if ("-startoptions".equals(args[i])) {
361: startoptions = Integer.parseInt(args[++i]);
362: System.out.println("Setting start options to "
363: + startoptions);
364: } else if ("-size".equals(args[i])) {
365: size = Integer.parseInt(args[++i]) - 4;
366: System.out.println("Message size will be:" + (size + 4)
367: + " bytes");
368: } else if ("-mode".equals(args[i])) {
369: if ("receive".equals(args[++i]))
370: send = false;
371: } else if ("-debug".equals(args[i])) {
372: debug = true;
373: } else if ("-help".equals(args[i])) {
374: usage();
375: System.exit(1);
376: }
377: }
378:
379: ManagedChannel channel = (ManagedChannel) ChannelCreator
380: .createChannel(args);
381:
382: LoadTest test = new LoadTest(channel, send, count, debug,
383: pause, stats, breakOnEx);
384: test.channelOptions = channelOptions;
385: LoadMessage msg = new LoadMessage();
386:
387: messageSize = LoadMessage.getMessageSize(msg);
388: channel.addChannelListener(test);
389: channel.addMembershipListener(test);
390: channel.start(startoptions);
391: Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
392: while (threads > 1) {
393: Thread t = new Thread(test);
394: t.setDaemon(true);
395: t.start();
396: threads--;
397: test = new LoadTest(channel, send, count, debug, pause,
398: stats, breakOnEx);
399: test.channelOptions = channelOptions;
400: }
401: test.run();
402: if (shutdown && send)
403: channel.stop(channel.DEFAULT);
404: System.out
405: .println("System test complete, sleeping to let threads finish.");
406: Thread.sleep(60 * 1000 * 60);
407: }
408:
409: public static class Shutdown extends Thread {
410: ManagedChannel channel = null;
411:
412: public Shutdown(ManagedChannel channel) {
413: this .channel = channel;
414: }
415:
416: public void run() {
417: System.out.println("Shutting down...");
418: SystemExit exit = new SystemExit(5000);
419: exit.setDaemon(true);
420: exit.start();
421: try {
422: channel.stop(channel.DEFAULT);
423:
424: } catch (Exception x) {
425: x.printStackTrace();
426: }
427: System.out.println("Channel stopped.");
428: }
429: }
430:
431: public static class SystemExit extends Thread {
432: private long delay;
433:
434: public SystemExit(long delay) {
435: this .delay = delay;
436: }
437:
438: public void run() {
439: try {
440: Thread.sleep(delay);
441: } catch (Exception x) {
442: x.printStackTrace();
443: }
444: System.exit(0);
445:
446: }
447: }
448:
449: }
|