001: package org.jgroups.debug;
002:
003: import org.jgroups.Address;
004: import org.jgroups.Event;
005: import org.jgroups.Message;
006: import org.jgroups.View;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.stack.ProtocolStack;
009: import org.jgroups.util.Queue;
010: import org.jgroups.util.QueueClosedException;
011:
012: import java.util.HashMap;
013: import java.util.Iterator;
014:
015: /**
016: * Tests one or more protocols independently. Look at org.jgroups.tests.FCTest for an example of how to use it.
017: * @author Bela Ban
018: * @version $Id: Simulator.java,v 1.6 2005/08/22 14:12:53 belaban Exp $
019: */
020: public class Simulator {
021: private Protocol[] protStack = null;
022: private ProtocolAdapter ad = new ProtocolAdapter();
023: ProtocolStack prot_stack = null;
024: private Receiver r = null;
025: private Protocol top = null, bottom = null;
026: private Queue send_queue = new Queue();
027: private Thread send_thread;
028: private Queue recv_queue = new Queue();
029: private Thread recv_thread;
030:
031: /** HashMap from Address to Simulator. */
032: private final HashMap addrTable = new HashMap();
033: private Address local_addr = null;
034: private View view;
035:
036: public interface Receiver {
037: void receive(Event evt);
038: }
039:
040: public void setProtocolStack(Protocol[] stack) {
041: this .protStack = stack;
042: this .protStack[0].setUpProtocol(ad);
043: this .protStack[this .protStack.length - 1].setDownProtocol(ad);
044: top = protStack[0];
045: bottom = this .protStack[this .protStack.length - 1];
046:
047: prot_stack = new ProtocolStack();
048:
049: if (protStack.length > 1) {
050: for (int i = 0; i < protStack.length; i++) {
051: Protocol p1 = protStack[i];
052: p1.setProtocolStack(prot_stack);
053: Protocol p2 = i + 1 >= protStack.length ? null
054: : protStack[i + 1];
055: if (p2 != null) {
056: p1.setDownProtocol(p2);
057: p2.setUpProtocol(p1);
058: }
059: }
060: }
061: }
062:
063: public String dumpStats() {
064: StringBuffer sb = new StringBuffer();
065: for (int i = 0; i < protStack.length; i++) {
066: Protocol p1 = protStack[i];
067: sb.append(p1.getName()).append(":\n")
068: .append(p1.dumpStats()).append("\n");
069: }
070: return sb.toString();
071: }
072:
073: public void addMember(Address addr) {
074: addMember(addr, this );
075: }
076:
077: public void addMember(Address addr, Simulator s) {
078: addrTable.put(addr, s);
079: }
080:
081: public void setLocalAddress(Address addr) {
082: this .local_addr = addr;
083: }
084:
085: public void setView(View v) {
086: this .view = v;
087: }
088:
089: public void setReceiver(Receiver r) {
090: this .r = r;
091: }
092:
093: public void send(Event evt) {
094: top.down(evt);
095: }
096:
097: public void receive(Event evt) {
098: try {
099: Event copy;
100: if (evt.getType() == Event.MSG && evt.getArg() != null) {
101: copy = new Event(Event.MSG, ((Message) evt.getArg())
102: .copy());
103: } else
104: copy = evt;
105:
106: recv_queue.add(copy);
107: } catch (QueueClosedException e) {
108: }
109: }
110:
111: public void start() throws Exception {
112: if (local_addr == null)
113: throw new Exception("local_addr has to be non-null");
114: if (protStack == null)
115: throw new Exception("protocol stack is null");
116:
117: bottom.up(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
118: if (view != null) {
119: Event view_evt = new Event(Event.VIEW_CHANGE, view);
120: bottom.up(view_evt);
121: top.down(view_evt);
122: }
123:
124: for (int i = 0; i < protStack.length; i++) {
125: Protocol p = protStack[i];
126: p.setProtocolStack(prot_stack);
127: }
128:
129: for (int i = 0; i < protStack.length; i++) {
130: Protocol p = protStack[i];
131: p.init();
132: }
133:
134: for (int i = 0; i < protStack.length; i++) {
135: Protocol p = protStack[i];
136: p.start();
137: }
138:
139: send_thread = new Thread() {
140: public void run() {
141: Event evt;
142: while (send_thread != null) {
143: try {
144: evt = (Event) send_queue.remove();
145: if (evt.getType() == Event.MSG) {
146: Message msg = (Message) evt.getArg();
147: Address dst = msg.getDest();
148: if (msg.getSrc() == null)
149: ((Message) evt.getArg())
150: .setSrc(local_addr);
151: Simulator s;
152: if (dst == null) {
153: for (Iterator it = addrTable.values()
154: .iterator(); it.hasNext();) {
155: s = (Simulator) it.next();
156: s.receive(evt);
157: }
158: } else {
159: s = (Simulator) addrTable.get(dst);
160: if (s != null)
161: s.receive(evt);
162: }
163: }
164: } catch (QueueClosedException e) {
165: send_thread = null;
166: break;
167: }
168: }
169: }
170: };
171: send_thread.start();
172:
173: recv_thread = new Thread() {
174: public void run() {
175: Event evt;
176: while (recv_thread != null) {
177: try {
178: evt = (Event) recv_queue.remove();
179: bottom.up(evt);
180: } catch (QueueClosedException e) {
181: recv_thread = null;
182: break;
183: }
184: }
185: }
186: };
187: recv_thread.start();
188: }
189:
190: public void stop() {
191: recv_thread = null;
192: recv_queue.close(false);
193: send_thread = null;
194: send_queue.close(false);
195: }
196:
197: class ProtocolAdapter extends Protocol {
198:
199: public String getName() {
200: return "ProtocolAdapter";
201: }
202:
203: public void up(Event evt) {
204: if (r != null)
205: r.receive(evt);
206: }
207:
208: /** send to unicast or multicast destination */
209: public void down(Event evt) {
210: try {
211: send_queue.add(evt);
212: } catch (QueueClosedException e) {
213: }
214: }
215: }
216:
217: }
|