001: package org.jgroups.protocols;
002:
003: import org.jgroups.Event;
004: import org.jgroups.Message;
005: import org.jgroups.stack.Protocol;
006:
007: import java.util.*;
008:
009: /**
010: * This layer shuffles upcoming messages, put it just above your bottom layer.
011: * If you system sends less than 2 messages per sec you can notice a latency due
012: * to this layer.
013: *
014: * @author Gianluca Collot
015: *
016: */
017:
018: public class SHUFFLE extends Protocol implements Runnable {
019:
020: String name = "SHUFFLE";
021: final List messages;
022: Thread messagesHandler;
023:
024: public SHUFFLE() {
025: messages = Collections.synchronizedList(new ArrayList());
026: }
027:
028: public String getName() {
029: return name;
030: }
031:
032: public boolean setProperties(Properties props) {
033: String str;
034:
035: super .setProperties(props);
036: str = props.getProperty("name");
037: if (str != null) {
038: name = str;
039: props.remove("name");
040: }
041:
042: if (props.size() > 0) {
043: log
044: .error("DUMMY.setProperties(): these properties are not recognized: "
045: + props);
046:
047: return false;
048: }
049: return true;
050: }
051:
052: /**
053: * Adds upcoming messages to the <code>messages List<\code> where the <code>messagesHandler<\code>
054: * retrieves them.
055: */
056:
057: public void up(Event evt) {
058: Message msg;
059:
060: switch (evt.getType()) {
061:
062: case Event.MSG:
063: msg = (Message) evt.getArg();
064: // Do something with the event, e.g. extract the message and remove a header.
065: // Optionally pass up
066: messages.add(msg);
067: return;
068: }
069:
070: passUp(evt); // Pass up to the layer above us
071: }
072:
073: /**
074: * Starts the <code>messagesHandler<\code>
075: */
076: public void start() throws Exception {
077: messagesHandler = new Thread(this , "MessagesHandler");
078: messagesHandler.setDaemon(true);
079: messagesHandler.start();
080: }
081:
082: /**
083: * Stops the messagesHandler
084: */
085: public void stop() {
086: Thread tmp = messagesHandler;
087: messagesHandler = null;
088: try {
089: tmp.join();
090: } catch (Exception ex) {
091: ex.printStackTrace();
092: }
093: }
094:
095: /**
096: * Removes a random chosen message from the <code>messages List<\code> if there
097: * are less than 10 messages in the List it waits some time to ensure to chose from
098: * a set of messages > 1.
099: */
100:
101: public void run() {
102: Message msg;
103: while (messagesHandler != null) {
104: if (messages.size() > 0) {
105: msg = (Message) messages.remove(rnd(messages.size()));
106: passUp(new Event(Event.MSG, msg));
107: }
108: if (messages.size() < 5) {
109: try {
110: Thread.sleep(300);
111: /** @todo make this time user configurable */
112: } catch (Exception ex) {
113: ex.printStackTrace();
114: }
115: }
116: }// while
117: // PassUp remaining messages
118: Iterator iter = messages.iterator();
119: while (iter.hasNext()) {
120: msg = (Message) iter.next();
121: passUp(new Event(Event.MSG, msg));
122: }
123: }
124:
125: // random integer between 0 and n-1
126: int rnd(int n) {
127: return (int) (Math.random() * n);
128: }
129:
130: }
|