001: // $Id: PARTITIONER.java,v 1.5 2005/05/30 14:31:07 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Header;
008: import org.jgroups.Message;
009: import org.jgroups.stack.Protocol;
010:
011: import java.io.IOException;
012: import java.io.ObjectInput;
013: import java.io.ObjectOutput;
014: import java.util.Hashtable;
015: import java.util.Properties;
016: import java.util.Vector;
017:
018: /**
019: * This layer can be put on top of the bottommost layer and is useful to simulate partitions.
020: * It simply adds a header with its partition number and discards Messages with other partition numbers.<br>
021: * If it receives an Event of type Event.SET_PARTITIONS it sends a Header of type COMMAND with the Hashtable
022: * contained in the Event argument to set the partitions of ALL processes (not just processes of the current view but
023: * every process with the same group address that receives the message.
024: */
025:
026: public class PARTITIONER extends Protocol {
027: final Vector members = new Vector();
028: Address local_addr = null;
029: int my_partition = 1;
030:
031: /** All protocol names have to be unique ! */
032: public String getName() {
033: return "PARTITIONER";
034: }
035:
036: public boolean setProperties(Properties props) {
037: String str;
038:
039: super .setProperties(props);
040: if (props.size() > 0) {
041: log
042: .error("EXAMPLE.setProperties(): these properties are not recognized: "
043: + props);
044:
045: return false;
046: }
047: return true;
048: }
049:
050: /** Just remove if you don't need to reset any state */
051: public void reset() {
052: }
053:
054: /**
055: * Discards Messages with the wrong partition number and sets local partition number if
056: * it receives a COMMAND Header
057: */
058:
059: public void up(Event evt) {
060: Message msg;
061: Integer num;
062: PartitionerHeader partHead = null;
063:
064: switch (evt.getType()) {
065:
066: case Event.SET_LOCAL_ADDRESS:
067: local_addr = (Address) evt.getArg();
068: if (log.isInfoEnabled())
069: log.info("local address is " + local_addr);
070: break;
071:
072: case Event.MSG:
073: msg = (Message) evt.getArg();
074: partHead = (PartitionerHeader) msg.removeHeader(getName());
075: if (partHead.type == PartitionerHeader.COMMAND) {
076: num = (Integer) partHead.Destinations.get(local_addr);
077: if (num == null)
078: return;
079: if (log.isInfoEnabled())
080: log.info("new partition = " + num);
081: my_partition = num.intValue();
082: return;
083: }
084: if (partHead.type == PartitionerHeader.NORMAL
085: && partHead.partition != my_partition)
086: return;
087: break;
088: }
089:
090: passUp(evt); // Pass up to the layer above us
091: }
092:
093: /**
094: * Adds to Messages a Header with the local partitin number and if receives a SET_PARTITIONS Event sends
095: * a new Message with a PartitionerHeader set to COMMAND that carries the Hashtable
096: */
097:
098: public void down(Event evt) {
099: Message msg;
100: Event newEvent;
101: PartitionerHeader partHeader;
102:
103: switch (evt.getType()) {
104:
105: case Event.SET_PARTITIONS:
106: //Sends a partitioning message
107: if (log.isInfoEnabled())
108: log.info("SET_PARTITIONS received, argument "
109: + evt.getArg().toString());
110: msg = new Message(null, null, null);
111: partHeader = new PartitionerHeader(
112: PartitionerHeader.COMMAND);
113: partHeader.Destinations = (Hashtable) evt.getArg();
114: msg.putHeader(getName(), partHeader);
115: passDown(new Event(Event.MSG, msg));
116: break;
117:
118: case Event.MSG:
119: msg = (Message) evt.getArg();
120: msg.putHeader(getName(), new PartitionerHeader(
121: PartitionerHeader.NORMAL, my_partition));
122: // Do something with the event, e.g. add a header to the message
123: // Optionally pass down
124: break;
125: }
126:
127: passDown(evt); // Pass on to the layer below us
128: }
129:
130: /**
131: * The Partitioner header normally (type = NORMAL) contains just the partition number that is checked to discard messages
132: * received from other partitions.
133: * If type is COMMAND Destination contains an Hashtable where keys are of type Address and represent process (channel)
134: * addresses and values are Integer representing the partition that shuold be assigned to each Address.
135: */
136:
137: public static class PartitionerHeader extends Header {
138: // your variables
139: static final int NORMAL = 0; //normal header (do nothing)
140: static final int COMMAND = 1; //set partition vector
141: int type = 0, partition = 1;
142: Hashtable Destinations = null;
143:
144: public PartitionerHeader() {
145: } // used for externalization
146:
147: public PartitionerHeader(int type) {
148: this .type = type;
149: }
150:
151: public PartitionerHeader(int type, int partition) {
152: this .type = type;
153: this .partition = partition;
154: }
155:
156: public String toString() {
157: switch (type) {
158: case NORMAL:
159: return "NORMAL ->partition :" + partition;
160: case COMMAND:
161: return "COMMAND ->hashtable :" + Destinations;
162: default:
163: return "<unknown>";
164:
165: }
166: }
167:
168: public void writeExternal(ObjectOutput out) throws IOException {
169: out.writeInt(type);
170: out.writeInt(partition);
171: out.writeObject(Destinations);
172: }
173:
174: public void readExternal(ObjectInput in) throws IOException,
175: ClassNotFoundException {
176: type = in.readInt();
177: partition = in.readInt();
178: Destinations = (Hashtable) in.readObject();
179: }
180: }
181:
182: }
|