001: // $Id: PERF_TP.java,v 1.12.6.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.stack.Protocol;
009:
010: /**
011: * Measures the time for a message to travel from the channel to the transport
012: * @author Bela Ban
013: * @version $Id: PERF_TP.java,v 1.12.6.1 2007/04/27 08:03:51 belaban Exp $
014: */
015: public class PERF_TP extends Protocol {
016: private Address local_addr = null;
017: static volatile PERF_TP instance = null;
018: long stop, start;
019: long num_msgs = 0;
020: long expected_msgs = 0;
021: boolean done = false;
022:
023: public static PERF_TP getInstance() {
024: return instance;
025: }
026:
027: public PERF_TP() {
028: if (instance == null)
029: instance = this ;
030: }
031:
032: public String toString() {
033: return "Protocol PERF_TP (local address: " + local_addr + ')';
034: }
035:
036: public boolean done() {
037: return done;
038: }
039:
040: public long getNumMessages() {
041: return num_msgs;
042: }
043:
044: public void setExpectedMessages(long m) {
045: expected_msgs = m;
046: num_msgs = 0;
047: done = false;
048: start = System.currentTimeMillis();
049: }
050:
051: public void reset() {
052: num_msgs = expected_msgs = stop = start = 0;
053: done = false;
054: }
055:
056: public long getTotalTime() {
057: return stop - start;
058: }
059:
060: /*------------------------------ Protocol interface ------------------------------ */
061:
062: public String getName() {
063: return "PERF_TP";
064: }
065:
066: public void init() throws Exception {
067: local_addr = new org.jgroups.stack.IpAddress("localhost", 10000); // fake address
068: }
069:
070: public void start() throws Exception {
071: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
072: }
073:
074: /**
075: * Caller by the layer above this layer. Usually we just put this Message
076: * into the send queue and let one or more worker threads handle it. A worker thread
077: * then removes the Message from the send queue, performs a conversion and adds the
078: * modified Message to the send queue of the layer below it, by calling Down).
079: */
080: public void down(Event evt) {
081: Message msg;
082: Address dest_addr;
083:
084: switch (evt.getType()) {
085:
086: case Event.MSG:
087: if (done) {
088: break;
089: }
090: msg = (Message) evt.getArg();
091: dest_addr = msg.getDest();
092: if (dest_addr == null)
093: num_msgs++;
094: if (num_msgs >= expected_msgs) {
095: stop = System.currentTimeMillis();
096: synchronized (this ) {
097: done = true;
098: this .notifyAll();
099: }
100: if (log.isInfoEnabled())
101: log.info("all done (num_msgs=" + num_msgs
102: + ", expected_msgs=" + expected_msgs);
103: }
104: break;
105:
106: case Event.CONNECT:
107: passUp(new Event(Event.CONNECT_OK));
108: return;
109:
110: case Event.DISCONNECT:
111: passUp(new Event(Event.DISCONNECT_OK));
112: return;
113: }
114:
115: if (down_prot != null)
116: passDown(evt);
117: }
118:
119: public void up(Event evt) {
120: Message msg;
121: Address dest_addr;
122: switch (evt.getType()) {
123:
124: case Event.MSG:
125: if (done) {
126: if (log.isWarnEnabled())
127: log.warn("all done (discarding msg)");
128: break;
129: }
130: msg = (Message) evt.getArg();
131: dest_addr = msg.getDest();
132: if (dest_addr == null)
133: num_msgs++;
134: if (num_msgs >= expected_msgs) {
135: stop = System.currentTimeMillis();
136: synchronized (this ) {
137: done = true;
138: this .notifyAll();
139: }
140: if (log.isInfoEnabled())
141: log.info("all done (num_msgs=" + num_msgs
142: + ", expected_msgs=" + expected_msgs);
143: }
144: return;
145: }
146: passUp(evt);
147: }
148:
149: /*--------------------------- End of Protocol interface -------------------------- */
150:
151: }
|