001: // $Id: HTOTAL.java,v 1.4.10.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.util.Streamable;
008: import org.jgroups.util.Util;
009:
010: import java.io.*;
011: import java.util.Properties;
012: import java.util.Vector;
013:
014: /**
015: * Implementation of UTO-TCP as designed by EPFL. Implements chaining algorithm: each sender sends the message
016: * to a coordinator who then forwards it to its neighbor on the right, who then forwards it to its neighbor to the right
017: * etc.
018: * @author Bela Ban
019: * @version $Id: HTOTAL.java,v 1.4.10.1 2007/04/27 08:03:51 belaban Exp $
020: */
021: public class HTOTAL extends Protocol {
022: Address coord = null;
023: Address neighbor = null; // to whom do we forward the message (member to the right, or null if we're at the tail)
024: Address local_addr = null;
025: Vector mbrs = new Vector();
026: boolean is_coord = false;
027: private boolean use_multipoint_forwarding = false;
028:
029: public HTOTAL() {
030: }
031:
032: public final String getName() {
033: return "HTOTAL";
034: }
035:
036: public boolean setProperties(Properties props) {
037: String str;
038:
039: super .setProperties(props);
040: str = props.getProperty("use_multipoint_forwarding");
041: if (str != null) {
042: use_multipoint_forwarding = Boolean.valueOf(str)
043: .booleanValue();
044: props.remove("use_multipoint_forwarding");
045: }
046:
047: if (props.size() > 0) {
048: log
049: .error("TCP.setProperties(): the following properties are not recognized: "
050: + props);
051:
052: return false;
053: }
054: return true;
055: }
056:
057: public void down(Event evt) {
058: switch (evt.getType()) {
059: case Event.VIEW_CHANGE:
060: determineCoordinatorAndNextMember((View) evt.getArg());
061: break;
062: case Event.MSG:
063: Message msg = (Message) evt.getArg();
064: Address dest = msg.getDest();
065: if (dest == null || dest.isMulticastAddress()) { // only process multipoint messages
066: if (coord == null)
067: log
068: .error("coordinator is null, cannot send message to coordinator");
069: else {
070: msg.setSrc(local_addr);
071: forwardTo(coord, msg);
072: }
073: return; // handled here, don't pass down by default
074: }
075: break;
076: }
077: passDown(evt);
078: }
079:
080: public void up(Event evt) {
081: switch (evt.getType()) {
082: case Event.SET_LOCAL_ADDRESS:
083: local_addr = (Address) evt.getArg();
084: break;
085: case Event.VIEW_CHANGE:
086: determineCoordinatorAndNextMember((View) evt.getArg());
087: break;
088: case Event.MSG:
089: Message msg = (Message) evt.getArg();
090: HTotalHeader hdr = (HTotalHeader) msg.getHeader(getName());
091:
092: if (hdr == null)
093: break; // probably a unicast message, just pass it up
094:
095: Message copy = msg.copy(false); // do not copy the buffer
096: if (use_multipoint_forwarding) {
097: copy.setDest(null);
098: passDown(new Event(Event.MSG, copy));
099: } else {
100: if (neighbor != null) {
101: forwardTo(neighbor, copy);
102: }
103: }
104:
105: msg.setDest(hdr.dest); // set destination to be the original destination
106: msg.setSrc(hdr.src); // set sender to be the original sender (important for retransmission requests)
107:
108: passUp(evt); // <-- we modify msg directly inside evt
109: return;
110: }
111: passUp(evt);
112: }
113:
114: private void forwardTo(Address destination, Message msg) {
115: HTotalHeader hdr = (HTotalHeader) msg.getHeader(getName());
116:
117: if (hdr == null) {
118: hdr = new HTotalHeader(msg.getDest(), msg.getSrc());
119: msg.putHeader(getName(), hdr);
120: }
121: msg.setDest(destination);
122: if (log.isTraceEnabled())
123: log.trace("forwarding message to " + destination + ", hdr="
124: + hdr);
125: passDown(new Event(Event.MSG, msg));
126: }
127:
128: private void determineCoordinatorAndNextMember(View v) {
129: Object tmp;
130: Address retval = null;
131:
132: mbrs.clear();
133: mbrs.addAll(v.getMembers());
134:
135: coord = (Address) (mbrs != null && mbrs.size() > 0 ? mbrs
136: .firstElement() : null);
137: is_coord = coord != null && local_addr != null
138: && coord.equals(local_addr);
139:
140: if (mbrs == null || mbrs.size() < 2 || local_addr == null)
141: neighbor = null;
142: else {
143: for (int i = 0; i < mbrs.size(); i++) {
144: tmp = mbrs.elementAt(i);
145: if (local_addr.equals(tmp)) {
146: if (i + 1 >= mbrs.size())
147: retval = null; // we don't wrap, last member is null
148: else
149: retval = (Address) mbrs.elementAt(i + 1);
150: break;
151: }
152: }
153: }
154: neighbor = retval;
155: if (log.isTraceEnabled())
156: log.trace("coord=" + coord + ", neighbor=" + neighbor);
157: }
158:
159: public static class HTotalHeader extends Header implements
160: Streamable {
161: Address dest, src;
162:
163: public HTotalHeader() {
164: }
165:
166: public HTotalHeader(Address dest, Address src) {
167: this .dest = dest;
168: this .src = src;
169: }
170:
171: public void writeExternal(ObjectOutput out) throws IOException {
172: out.writeObject(dest);
173: out.writeObject(src);
174: }
175:
176: public void readExternal(ObjectInput in) throws IOException,
177: ClassNotFoundException {
178: dest = (Address) in.readObject();
179: src = (Address) in.readObject();
180: }
181:
182: public void writeTo(DataOutputStream out) throws IOException {
183: Util.writeAddress(dest, out);
184: Util.writeAddress(src, out);
185: }
186:
187: public void readFrom(DataInputStream in) throws IOException,
188: IllegalAccessException, InstantiationException {
189: dest = Util.readAddress(in);
190: src = Util.readAddress(in);
191: }
192:
193: public String toString() {
194: return "dest=" + dest + ", src=" + src;
195: }
196: }
197:
198: }
|