001: package org.jgroups.protocols;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.Protocol;
005:
006: import java.io.IOException;
007: import java.io.ObjectInput;
008: import java.io.ObjectOutput;
009: import java.util.Vector;
010:
011: /**
012: * The coordinator attaches a small header to each (or every nth) message. If another coordinator <em>in the
013: * same group</em> sees the message, it will initiate the merge protocol immediately by sending a MERGE
014: * event up the stack.
015: * @author Bela Ban, Aug 25 2003
016: */
017: public class MERGEFAST extends Protocol {
018: Address local_addr = null;
019: boolean is_coord = false;
020: static final String name = "MERGEFAST";
021:
022: public String getName() {
023: return name;
024: }
025:
026: public void down(Event evt) {
027: if (is_coord == true && evt.getType() == Event.MSG
028: && local_addr != null) {
029: Message msg = (Message) evt.getArg();
030: Address dest = msg.getDest();
031: if (dest == null || dest.isMulticastAddress()) {
032: msg.putHeader(getName(),
033: new MergefastHeader(local_addr));
034: }
035: }
036:
037: if (evt.getType() == Event.VIEW_CHANGE) {
038: handleViewChange((View) evt.getArg());
039: }
040:
041: passDown(evt);
042: }
043:
044: public void up(Event evt) {
045: switch (evt.getType()) {
046: case Event.SET_LOCAL_ADDRESS:
047: local_addr = (Address) evt.getArg();
048: break;
049: case Event.MSG:
050: if (is_coord == false) // only handle message if we are coordinator
051: break;
052: Message msg = (Message) evt.getArg();
053: MergefastHeader hdr = (MergefastHeader) msg
054: .removeHeader(name);
055: passUp(evt);
056: if (hdr != null && local_addr != null) {
057: Address other_coord = hdr.coord;
058: if (!local_addr.equals(other_coord)) {
059: sendUpMerge(new Address[] { local_addr, other_coord });
060: }
061: }
062: return; // event was already passed up
063: case Event.VIEW_CHANGE:
064: handleViewChange((View) evt.getArg());
065: break;
066: }
067: passUp(evt);
068: }
069:
070: void handleViewChange(View v) {
071: Vector mbrs;
072: if (local_addr == null)
073: return;
074: mbrs = v.getMembers();
075: is_coord = mbrs != null && mbrs.size() > 0
076: && local_addr.equals(mbrs.firstElement());
077: }
078:
079: /**
080: * @todo avoid sending up too many MERGE events.
081: */
082: void sendUpMerge(Address[] addresses) {
083: Vector v = new Vector(11);
084: for (int i = 0; i < addresses.length; i++) {
085: Address addr = addresses[i];
086: v.add(addr);
087: }
088: passUp(new Event(Event.MERGE, v));
089: }
090:
091: public static class MergefastHeader extends Header {
092: Address coord = null;
093:
094: public MergefastHeader() {
095: }
096:
097: public MergefastHeader(Address coord) {
098: this .coord = coord;
099: }
100:
101: public void writeExternal(ObjectOutput out) throws IOException {
102: out.writeObject(coord);
103: }
104:
105: public void readExternal(ObjectInput in) throws IOException,
106: ClassNotFoundException {
107: coord = (Address) in.readObject();
108: }
109:
110: }
111:
112: }
|