Source Code Cross Referenced for Test.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » tests » perf » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.tests.perf 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.tests.perf;
002:
003:        import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
004:        import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
005:        import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
006:        import org.apache.commons.logging.Log;
007:        import org.apache.commons.logging.LogFactory;
008:        import org.jgroups.Version;
009:        import org.jgroups.util.Util;
010:
011:        import java.io.BufferedReader;
012:        import java.io.FileReader;
013:        import java.io.FileWriter;
014:        import java.io.IOException;
015:        import java.text.NumberFormat;
016:        import java.util.*;
017:
018:        /**  You start the test by running this class.
019:         * @author Bela Ban (belaban@yahoo.com)
020:
021:         */
022:        public class Test implements  Receiver {
023:            String props = null;
024:            Properties config;
025:            boolean sender = false;
026:            Transport transport = null;
027:            Object local_addr = null;
028:
029:            /** Map<Object,MemberInfo> members. Keys=member addresses, value=MemberInfo */
030:            Map senders = new ConcurrentReaderHashMap(10);
031:
032:            /** Keeps track of members. ArrayList<SocketAddress> */
033:            final ArrayList members = new ArrayList();
034:
035:            /** Set when first message is received */
036:            long start = 0;
037:
038:            /** Set when last message is received */
039:            long stop = 0;
040:
041:            int num_members = 0;
042:            int num_senders = 0;
043:            long num_msgs_expected = 0;
044:
045:            long num_msgs_received = 0; // from everyone
046:            long num_bytes_received = 0; // from everyone
047:
048:            Log log = LogFactory.getLog(getClass());
049:
050:            boolean all_received = false;
051:            boolean final_results_received = false;
052:
053:            /** Map<Object, MemberInfo>. A hashmap of senders, each value is the 'senders' hashmap */
054:            Map results = new HashMap();
055:
056:            private ResultsPublisher publisher = new ResultsPublisher();
057:
058:            List heard_from = new ArrayList();
059:
060:            boolean dump_transport_stats = false;
061:
062:            /** Log every n msgs received */
063:            long log_interval = 1000;
064:
065:            long counter = 1;
066:            long msg_size = 1000;
067:            boolean jmx = false;
068:
069:            /** Number of ms to wait at the receiver to simulate processing of the received message (0 == don't wait) */
070:            long processing_delay = 0;
071:
072:            FileWriter output = null;
073:
074:            QueuedExecutor response_sender = new QueuedExecutor();
075:
076:            static NumberFormat f;
077:
078:            static {
079:                f = NumberFormat.getNumberInstance();
080:                f.setGroupingUsed(false);
081:                f.setMaximumFractionDigits(2);
082:            }
083:
084:            public void start(Properties c, boolean verbose, boolean jmx,
085:                    String output) throws Exception {
086:                String config_file = "config.txt";
087:                BufferedReader fileReader;
088:                String line;
089:                String key, val;
090:                StringTokenizer st;
091:                Properties tmp = new Properties();
092:
093:                if (output != null)
094:                    this .output = new FileWriter(output, false);
095:
096:                response_sender.setThreadFactory(new ThreadFactory() {
097:                    public Thread newThread(Runnable runnable) {
098:                        return new Thread(runnable, "Test.ResponseSender");
099:                    }
100:                });
101:
102:                config_file = c.getProperty("config");
103:                fileReader = new BufferedReader(new FileReader(config_file));
104:                while ((line = fileReader.readLine()) != null) {
105:                    if (line.startsWith("#"))
106:                        continue;
107:                    line = line.trim();
108:                    if (line.length() == 0)
109:                        continue;
110:                    st = new StringTokenizer(line, "=", false);
111:                    key = st.nextToken().toLowerCase();
112:                    val = st.nextToken();
113:                    tmp.put(key, val);
114:                }
115:                fileReader.close();
116:
117:                // 'tmp' now contains all properties from the file, now we need to override the ones
118:                // passed to us by 'c'
119:                tmp.putAll(c);
120:                this .config = tmp;
121:
122:                StringBuffer sb = new StringBuffer();
123:                sb
124:                        .append("\n\n----------------------- TEST -----------------------\n");
125:                sb.append("Date: ").append(new Date()).append('\n');
126:                sb.append("Run by: ").append(System.getProperty("user.name"))
127:                        .append("\n\n");
128:                if (verbose)
129:                    sb.append("Properties: ").append(printProperties()).append(
130:                            "\n-------------------------\n\n");
131:
132:                for (Iterator it = this .config.entrySet().iterator(); it
133:                        .hasNext();) {
134:                    Map.Entry entry = (Map.Entry) it.next();
135:                    sb.append(entry.getKey()).append(":\t").append(
136:                            entry.getValue()).append('\n');
137:                }
138:                sb.append("JGroups version: ").append(Version.description)
139:                        .append('\n');
140:                System.out.println("Configuration is: " + sb);
141:
142:                output(sb.toString());
143:
144:                props = this .config.getProperty("props");
145:                num_members = Integer.parseInt(this .config
146:                        .getProperty("num_members"));
147:                num_senders = Integer.parseInt(this .config
148:                        .getProperty("num_senders"));
149:                long num_msgs = Long.parseLong(this .config
150:                        .getProperty("num_msgs"));
151:                this .num_msgs_expected = num_senders * num_msgs;
152:                sender = Boolean.valueOf(this .config.getProperty("sender"))
153:                        .booleanValue();
154:                msg_size = Long.parseLong(this .config.getProperty("msg_size"));
155:                String tmp2 = this .config.getProperty("dump_transport_stats",
156:                        "false");
157:                if (Boolean.valueOf(tmp2).booleanValue())
158:                    this .dump_transport_stats = true;
159:                tmp2 = this .config.getProperty("log_interval");
160:                if (tmp2 != null)
161:                    log_interval = Long.parseLong(tmp2);
162:
163:                sb = new StringBuffer();
164:                sb.append("\n##### msgs_received");
165:                sb.append(", current time (in ms)");
166:                sb.append(", msgs/sec");
167:                sb.append(", throughput/sec [KB]");
168:                sb.append(", free_mem [KB] ");
169:                sb.append(", total_mem [KB] ");
170:                output(sb.toString());
171:
172:                if (jmx) {
173:                    this .config.setProperty("jmx", "true");
174:                }
175:                this .jmx = new Boolean(this .config.getProperty("jmx"))
176:                        .booleanValue();
177:
178:                String tmp3 = this .config.getProperty("processing_delay");
179:                if (tmp3 != null)
180:                    this .processing_delay = Long.parseLong(tmp3);
181:
182:                String transport_name = this .config.getProperty("transport");
183:                transport = (Transport) Util.loadClass(transport_name,
184:                        this .getClass()).newInstance();
185:                transport.create(this .config);
186:                transport.setReceiver(this );
187:                transport.start();
188:                local_addr = transport.getLocalAddress();
189:            }
190:
191:            private void output(String msg) {
192:                // if(log.isInfoEnabled())
193:                // log.info(msg);
194:                if (this .output != null) {
195:                    try {
196:                        this .output.write(msg + "\n");
197:                        this .output.flush();
198:                    } catch (IOException e) {
199:                    }
200:                }
201:            }
202:
203:            private String printProperties() {
204:                StringBuffer sb = new StringBuffer();
205:                Properties p = System.getProperties();
206:                for (Iterator it = p.entrySet().iterator(); it.hasNext();) {
207:                    Map.Entry entry = (Map.Entry) it.next();
208:                    sb.append(entry.getKey()).append(": ").append(
209:                            entry.getValue()).append('\n');
210:                }
211:                return sb.toString();
212:            }
213:
214:            public void stop() {
215:                if (transport != null) {
216:                    transport.stop();
217:                    transport.destroy();
218:                }
219:                if (response_sender != null) {
220:                    response_sender.shutdownNow();
221:                }
222:                if (this .output != null) {
223:                    try {
224:                        this .output.close();
225:                    } catch (IOException e) {
226:                    }
227:                }
228:            }
229:
230:            public void receive(Object sender, byte[] payload) {
231:                if (payload == null || payload.length == 0) {
232:                    System.err.println("payload is incorrect (sender=" + sender
233:                            + "): " + payload);
234:                    return;
235:                }
236:
237:                try {
238:                    int type = payload[0];
239:                    if (type == 1) { // DATA
240:                        int len = payload.length - 1;
241:                        handleData(sender, len);
242:                        return;
243:                    }
244:
245:                    byte[] tmp = new byte[payload.length - 1];
246:                    System.arraycopy(payload, 1, tmp, 0, tmp.length);
247:                    Data d = (Data) Util.streamableFromByteBuffer(Data.class,
248:                            tmp);
249:
250:                    switch (d.getType()) {
251:                    case Data.DISCOVERY_REQ:
252:                        // System.out.println("-- received discovery request");
253:                        sendDiscoveryResponse();
254:                        break;
255:                    case Data.DISCOVERY_RSP:
256:                        // System.out.println("-- received discovery response from " + sender);
257:                        synchronized (this .members) {
258:                            if (!this .members.contains(sender)) {
259:                                this .members.add(sender);
260:                                System.out.println("-- " + sender + " joined");
261:                                if (d.sender) {
262:                                    synchronized (this .members) {
263:                                        if (!this .senders.containsKey(sender)) {
264:                                            this .senders.put(sender,
265:                                                    new MemberInfo(d.num_msgs));
266:                                        }
267:                                    }
268:                                }
269:                                this .members.notifyAll();
270:                            }
271:                        }
272:                        break;
273:
274:                    case Data.FINAL_RESULTS:
275:                        publisher.stop();
276:                        if (!final_results_received) {
277:                            dumpResults(d.results);
278:                            final_results_received = true;
279:                        }
280:                        synchronized (this ) {
281:                            this .notifyAll();
282:                        }
283:                        break;
284:
285:                    case Data.RESULTS:
286:                        results.put(sender, d.result);
287:                        heard_from.remove(sender);
288:                        if (heard_from.size() == 0) {
289:                            for (int i = 0; i < 3; i++) {
290:                                sendFinalResults();
291:                                Util.sleep(100);
292:                            }
293:                        }
294:                        break;
295:
296:                    default:
297:                        log.error("received invalid data type: " + payload[0]);
298:                        break;
299:                    }
300:                } catch (Exception e) {
301:                    e.printStackTrace();
302:                }
303:            }
304:
305:            private void handleData(Object sender, int num_bytes) {
306:                if (all_received)
307:                    return;
308:                if (start == 0) {
309:                    start = System.currentTimeMillis();
310:                }
311:
312:                num_msgs_received++;
313:                num_bytes_received += num_bytes;
314:
315:                if (num_msgs_received >= num_msgs_expected) {
316:                    if (stop == 0)
317:                        stop = System.currentTimeMillis();
318:                    all_received = true;
319:                }
320:
321:                if (num_msgs_received % log_interval == 0)
322:                    System.out.println(new StringBuffer("-- received ").append(
323:                            num_msgs_received).append(" messages"));
324:
325:                if (counter % log_interval == 0) {
326:                    output(dumpStats(counter));
327:                }
328:
329:                MemberInfo info = (MemberInfo) this .senders.get(sender);
330:                if (info != null) {
331:                    if (info.start == 0)
332:                        info.start = System.currentTimeMillis();
333:                    info.num_msgs_received++;
334:                    counter++;
335:                    info.total_bytes_received += num_bytes;
336:                    if (info.num_msgs_received >= info.num_msgs_expected) {
337:                        info.done = true;
338:                        if (info.stop == 0)
339:                            info.stop = System.currentTimeMillis();
340:                    } else {
341:                        if (processing_delay > 0)
342:                            Util.sleep(processing_delay);
343:                    }
344:                } else {
345:                    log.error("-- sender " + sender
346:                            + " not found in senders hashmap");
347:                }
348:
349:                if (all_received) {
350:                    if (!this .sender)
351:                        dumpSenders();
352:                    publisher.start();
353:                }
354:            }
355:
356:            private void sendResults() throws Exception {
357:                Data d = new Data(Data.RESULTS);
358:                byte[] buf;
359:                MemberInfo info = new MemberInfo(num_msgs_expected);
360:                info.done = true;
361:                info.num_msgs_received = num_msgs_received;
362:                info.start = start;
363:                info.stop = stop;
364:                info.total_bytes_received = this .num_bytes_received;
365:                d.result = info;
366:                buf = generatePayload(d, null);
367:                transport.send(null, buf);
368:            }
369:
370:            private void sendFinalResults() throws Exception {
371:                Data d = new Data(Data.FINAL_RESULTS);
372:                d.results = new ConcurrentReaderHashMap(this .results);
373:                final byte[] buf = generatePayload(d, null);
374:                // transport.send(null, buf);
375:
376:                response_sender.execute(new Runnable() {
377:                    public void run() {
378:                        try {
379:                            transport.send(null, buf);
380:                        } catch (Exception e) {
381:                            log.error("failed sending discovery response", e);
382:                        }
383:                    }
384:                });
385:            }
386:
387:            boolean allReceived() {
388:                return all_received;
389:            }
390:
391:            boolean receivedFinalResults() {
392:                return final_results_received;
393:            }
394:
395:            void sendMessages(long interval, int nanos, boolean busy_sleep)
396:                    throws Exception {
397:                long total_msgs = 0;
398:                int msgSize = Integer.parseInt(config.getProperty("msg_size"));
399:                int num_msgs = Integer.parseInt(config.getProperty("num_msgs"));
400:                // int logInterval=Integer.parseInt(config.getProperty("log_interval"));
401:                byte[] buf = new byte[msgSize];
402:                for (int k = 0; k < msgSize; k++)
403:                    buf[k] = '.';
404:                Data d = new Data(Data.DATA);
405:                byte[] payload = generatePayload(d, buf);
406:                System.out.println("-- sending " + num_msgs + " "
407:                        + Util.printBytes(msgSize) + " messages");
408:                for (int i = 0; i < num_msgs; i++) {
409:                    transport.send(null, payload);
410:                    total_msgs++;
411:                    if (total_msgs % log_interval == 0) {
412:                        System.out.println("++ sent " + total_msgs);
413:                    }
414:                    if (interval > 0 || nanos > 0) {
415:                        if (busy_sleep)
416:                            Util.sleep(interval, busy_sleep);
417:                        else
418:                            Util.sleep(interval, nanos);
419:                    }
420:                }
421:            }
422:
423:            byte[] generatePayload(Data d, byte[] buf) throws Exception {
424:                byte[] tmp = buf != null ? buf : Util.streamableToByteBuffer(d);
425:                byte[] payload = new byte[tmp.length + 1];
426:                payload[0] = intToByte(d.getType());
427:                System.arraycopy(tmp, 0, payload, 1, tmp.length);
428:                return payload;
429:            }
430:
431:            private byte intToByte(int type) {
432:                switch (type) {
433:                case Data.DATA:
434:                    return 1;
435:                case Data.DISCOVERY_REQ:
436:                    return 2;
437:                case Data.DISCOVERY_RSP:
438:                    return 3;
439:                case Data.RESULTS:
440:                    return 4;
441:                case Data.FINAL_RESULTS:
442:                    return 5;
443:                default:
444:                    return 0;
445:                }
446:            }
447:
448:            private void dumpResults(Map final_results) {
449:                Object member;
450:                Map.Entry entry;
451:                MemberInfo val;
452:                double combined_msgs_sec, tmp = 0;
453:                long combined_tp;
454:                StringBuffer sb = new StringBuffer();
455:                sb.append("\n-- results:\n");
456:
457:                for (Iterator it = final_results.entrySet().iterator(); it
458:                        .hasNext();) {
459:                    entry = (Map.Entry) it.next();
460:                    member = entry.getKey();
461:                    val = (MemberInfo) entry.getValue();
462:                    tmp += val.getMessageSec();
463:                    sb.append("\n").append(member);
464:                    if (member.equals(local_addr))
465:                        sb.append(" (myself)");
466:                    sb.append(":\n");
467:                    sb.append(val);
468:                    sb.append('\n');
469:                }
470:                combined_msgs_sec = tmp / final_results.size();
471:                combined_tp = (long) combined_msgs_sec * msg_size;
472:
473:                sb.append("\ncombined: ").append(f.format(combined_msgs_sec))
474:                        .append(
475:                                " msgs/sec averaged over all receivers (throughput="
476:                                        + Util.printBytes(combined_tp)
477:                                        + "/sec)\n");
478:                System.out.println(sb.toString());
479:                output(sb.toString());
480:            }
481:
482:            private void dumpSenders() {
483:                StringBuffer sb = new StringBuffer();
484:                dump(this .senders, sb);
485:                System.out.println(sb.toString());
486:            }
487:
488:            private void dump(Map map, StringBuffer sb) {
489:                Map.Entry entry;
490:                Object mySender;
491:                MemberInfo mi;
492:                MemberInfo combined = new MemberInfo(0);
493:                combined.start = Long.MAX_VALUE;
494:                combined.stop = Long.MIN_VALUE;
495:
496:                sb.append("\n-- local results:\n");
497:                for (Iterator it2 = map.entrySet().iterator(); it2.hasNext();) {
498:                    entry = (Map.Entry) it2.next();
499:                    mySender = entry.getKey();
500:                    mi = (MemberInfo) entry.getValue();
501:                    combined.start = Math.min(combined.start, mi.start);
502:                    combined.stop = Math.max(combined.stop, mi.stop);
503:                    combined.num_msgs_expected += mi.num_msgs_expected;
504:                    combined.num_msgs_received += mi.num_msgs_received;
505:                    combined.total_bytes_received += mi.total_bytes_received;
506:                    sb.append("sender: ").append(mySender).append(": ").append(
507:                            mi).append('\n');
508:                }
509:            }
510:
511:            private String dumpStats(long received_msgs) {
512:                double msgs_sec, throughput_sec;
513:                long current;
514:
515:                StringBuffer sb = new StringBuffer();
516:                sb.append(received_msgs).append(' ');
517:
518:                current = System.currentTimeMillis();
519:                sb.append(current).append(' ');
520:
521:                msgs_sec = received_msgs / ((current - start) / 1000.0);
522:                throughput_sec = msgs_sec * msg_size;
523:
524:                sb.append(f.format(msgs_sec)).append(' ').append(
525:                        f.format(throughput_sec)).append(' ');
526:
527:                sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(
528:                        ' ');
529:
530:                sb.append(Runtime.getRuntime().totalMemory() / 1000.0);
531:
532:                if (dump_transport_stats) {
533:                    Map stats = transport.dumpStats();
534:                    if (stats != null) {
535:                        print(stats, sb);
536:                    }
537:                }
538:                return sb.toString();
539:            }
540:
541:            public String dumpTransportStats() {
542:                Map stats = transport.dumpStats();
543:                StringBuffer sb = new StringBuffer(128);
544:                if (stats != null) {
545:                    Map.Entry entry;
546:                    String key;
547:                    Map value;
548:                    for (Iterator it = stats.entrySet().iterator(); it
549:                            .hasNext();) {
550:                        entry = (Map.Entry) it.next();
551:                        key = (String) entry.getKey();
552:                        value = (Map) entry.getValue();
553:                        sb.append("\n").append(key).append(":\n");
554:                        for (Iterator it2 = value.entrySet().iterator(); it2
555:                                .hasNext();) {
556:                            sb.append(it2.next()).append("\n");
557:                        }
558:                    }
559:                }
560:                return sb.toString();
561:            }
562:
563:            private void print(Map stats, StringBuffer sb) {
564:                sb.append("\nTransport stats:\n\n");
565:                Map.Entry entry;
566:                Object key, val;
567:                for (Iterator it = stats.entrySet().iterator(); it.hasNext();) {
568:                    entry = (Map.Entry) it.next();
569:                    key = entry.getKey();
570:                    val = entry.getValue();
571:                    sb.append(key).append(": ").append(val).append("\n");
572:                }
573:            }
574:
575:            void runDiscoveryPhase() throws Exception {
576:                sendDiscoveryRequest();
577:                sendDiscoveryResponse();
578:
579:                synchronized (this .members) {
580:                    System.out.println("-- waiting for " + num_members
581:                            + " members to join");
582:                    while (this .members.size() < num_members) {
583:                        this .members.wait(2000);
584:                        sendDiscoveryRequest();
585:                        sendDiscoveryResponse();
586:                    }
587:
588:                    heard_from.addAll(members);
589:                    System.out.println("-- members: " + this .members.size());
590:                }
591:            }
592:
593:            void sendDiscoveryRequest() throws Exception {
594:                Data d = new Data(Data.DISCOVERY_REQ);
595:                // System.out.println("-- sending discovery request");
596:                transport.send(null, generatePayload(d, null));
597:            }
598:
599:            void sendDiscoveryResponse() throws Exception {
600:                final Data d2 = new Data(Data.DISCOVERY_RSP);
601:                if (sender) {
602:                    d2.sender = true;
603:                    d2.num_msgs = Long
604:                            .parseLong(config.getProperty("num_msgs"));
605:                }
606:
607:                response_sender.execute(new Runnable() {
608:                    public void run() {
609:                        try {
610:                            transport.send(null, generatePayload(d2, null));
611:                        } catch (Exception e) {
612:                            log.error("failed sending discovery response", e);
613:                        }
614:                    }
615:                });
616:
617:            }
618:
619:            public static void main(String[] args) {
620:                Properties config = new Properties();
621:                boolean sender = false, verbose = false, jmx = false, dump_stats = false; // dumps at end of run
622:                Test t = null;
623:                String output = null;
624:                long interval = 0;
625:                int interval_nanos = 0;
626:                boolean busy_sleep = false;
627:
628:                for (int i = 0; i < args.length; i++) {
629:                    if ("-sender".equals(args[i])) {
630:                        config.put("sender", "true");
631:                        sender = true;
632:                        continue;
633:                    }
634:                    if ("-receiver".equals(args[i])) {
635:                        config.put("sender", "false");
636:                        sender = false;
637:                        continue;
638:                    }
639:                    if ("-config".equals(args[i])) {
640:                        String config_file = args[++i];
641:                        config.put("config", config_file);
642:                        continue;
643:                    }
644:                    if ("-props".equals(args[i])) {
645:                        String props = args[++i];
646:                        config.put("props", props);
647:                        continue;
648:                    }
649:                    if ("-verbose".equals(args[i])) {
650:                        verbose = true;
651:                        continue;
652:                    }
653:                    if ("-jmx".equals(args[i])) {
654:                        jmx = true;
655:                        continue;
656:                    }
657:                    if ("-dump_stats".equals(args[i])) {
658:                        dump_stats = true;
659:                        continue;
660:                    }
661:                    if ("-interval".equals(args[i])) {
662:                        interval = Long.parseLong(args[++i]);
663:                        continue;
664:                    }
665:                    if ("-nanos".equals(args[i])) {
666:                        interval_nanos = Integer.parseInt(args[++i]);
667:                        continue;
668:                    }
669:                    if ("-busy_sleep".equals(args[i])) {
670:                        busy_sleep = true;
671:                        continue;
672:                    }
673:                    if ("-f".equals(args[i])) {
674:                        output = args[++i];
675:                        continue;
676:                    }
677:                    help();
678:                    return;
679:                }
680:
681:                try {
682:
683:                    /*int prio=Thread.currentThread().getPriority();
684:                    System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);
685:
686:                    Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
687:                    prio=Thread.currentThread().getPriority();
688:                    System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);*/
689:
690:                    t = new Test();
691:                    t.start(config, verbose, jmx, output);
692:                    t.runDiscoveryPhase();
693:                    if (sender) {
694:                        t.sendMessages(interval, interval_nanos, busy_sleep);
695:                    }
696:                    synchronized (t) {
697:                        while (t.receivedFinalResults() == false) {
698:                            t.wait(2000);
699:                        }
700:                    }
701:                    if (dump_stats) {
702:                        String stats = t.dumpTransportStats();
703:                        System.out.println("\nTransport statistics:\n" + stats);
704:                    }
705:                    if (t.jmx) {
706:                        System.out.println("jmx=true: not terminating");
707:                        if (t != null) {
708:                            t.stop();
709:                            t = null;
710:                        }
711:                        while (true) {
712:                            Util.sleep(60000);
713:                        }
714:                    }
715:                } catch (Exception e) {
716:                    e.printStackTrace();
717:                } finally {
718:                    if (t != null) {
719:                        t.stop();
720:                    }
721:                }
722:            }
723:
724:            static void help() {
725:                System.out
726:                        .println("Test [-help] ([-sender] | [-receiver]) "
727:                                + "[-config <config file>] "
728:                                + "[-props <stack config>] [-verbose] [-jmx] "
729:                                + "[-dump_stats] [-f <filename>] [-interval <ms between sends>] "
730:                                + "[-nanos <additional nanos to sleep in interval>] [-busy_sleep (cancels out -nanos)]");
731:            }
732:
733:            private class ResultsPublisher implements  Runnable {
734:                final long interval = 1000;
735:                boolean running = true;
736:                Thread t;
737:
738:                void start() {
739:                    if (t == null) {
740:                        t = new Thread(this , "ResultsPublisher");
741:                        t.setDaemon(true);
742:                        t.start();
743:                    }
744:                }
745:
746:                void stop() {
747:                    if (t != null && t.isAlive()) {
748:                        Thread tmp = t;
749:                        t = null;
750:                        tmp.interrupt();
751:                    }
752:                }
753:
754:                public void run() {
755:                    try {
756:                        while (t != null) {
757:                            sendResults();
758:                            Util.sleep(interval);
759:                        }
760:                    } catch (Exception e) {
761:                        e.printStackTrace();
762:                    }
763:                }
764:            }
765:
766:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.