001: // $Id: CAUSAL.java,v 1.8 2005/07/17 11:36:15 chrislott Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007:
008: import java.io.IOException;
009: import java.io.ObjectInput;
010: import java.io.ObjectOutput;
011: import java.util.LinkedList;
012: import java.util.ListIterator;
013: import java.util.Vector;
014:
015: /** <p>
016: * Implements casual ordering layer using vector clocks.
017: * </p>
018: * <p>
019: * Causal protocol layer guarantees that if message m0 multicasted
020: * by a process group member p0 causes process group member
021: * p1 to multicast message p1 then all other remaining process group
022: * members in a current view will receive messages in order m0
023: * followed by m1.
024: * </p>
025: * <p>
026: * First time encountered, causal order seems very similar to FIFO order but
027: * there is an important distinction. While FIFO order gurantees that
028: * if process group member p0 multicasts m0 followed by m1 the messages
029: * will be delivered in order m0,m1 to all other group members, causal
030: * order expands this notion of an order from a single group member "space"
031: * to a whole group space i.e if p0 sends message m0 which causes member
032: * p1 to send message m1 then all other group members are guaranteed to
033: * receive m0 followed by m1.
034: * </p>
035: * <p>
036: * Causal protocol layer achieves this ordering type by introducing sense of
037: * a time in a group using vector clocks. The idea is very simple. Each message
038: * is labeled by a vector, contained in a causal header, representing the number of
039: * prior causal messages received by the sending group member. Vector time of [3,5,2,4] in
040: * a group of four members [p0,p1,p2,p3] means that process p0 has sent 3 messages
041: * and has received 5,2 and 4 messages from a member p1,p2 and p3 respectively.
042: * </p>
043: * <p>
044: * Each member increases its counter by 1 when it sends a message. When receiving
045: * message mi from a member pi , (where pi != pj) containing vector time VT(mi),
046: * process pj delays delivery of a message mi until:
047: * </p>
048: * <p>
049: * for every k:1..n
050: *
051: * VT(mi)[k] == VT(pj)[k] + 1 if k=i,
052: * VT(mi)[k] <= VT(pj)[k] otherwise
053: * </p>
054: * <p>
055: * After the next causal message is delivered at process group pj, VT(pj) is
056: * updated as follows:
057: *</p>
058: *<p>
059: * for every k:1...n VT(pj)[k] == max(VT(mi)[k],VT(pj)[k])
060: *</p>
061: * @author Vladimir Blagojevic vladimir@cs.yorku.ca
062: * @version $Revision: 1.8 $
063: *
064: **/
065:
066: public class CAUSAL extends Protocol {
067:
068: public static class CausalHeader extends Header {
069: /**
070: * vector timestamp of this header/message
071: */
072: private TransportedVectorTime t;
073:
074: /**
075: *used for externalization
076: */
077: public CausalHeader() {
078: }
079:
080: public CausalHeader(TransportedVectorTime timeVector) {
081: t = timeVector;
082: }
083:
084: /**
085: *Returns a vector timestamp carreid by this header
086: *@return Vector timestamp contained in this header
087: */
088: public TransportedVectorTime getVectorTime() {
089: return t;
090: }
091:
092: /**
093: * Size of this vector timestamp estimation, used in fragmetation
094: * @return headersize in bytes
095: */
096: public long size() {
097:
098: /*why 231, don't know but these are this values I get when
099: flattening the object into byte buffer*/
100: return 231 + (t.size() * 4);
101: }
102:
103: /**
104: * Manual serialization
105: *
106: *
107: */
108: public void writeExternal(ObjectOutput out) throws IOException {
109: out.writeObject(t);
110: }
111:
112: /**
113: * Manual deserialization
114: *
115: */
116: public void readExternal(ObjectInput in) throws IOException,
117: ClassNotFoundException {
118: t = (TransportedVectorTime) in.readObject();
119: }
120:
121: public String toString() {
122: return "[CAUSALHEADER:" + t + ']';
123: }
124: }
125:
126: /**
127: *Vector time clock belonging to the member that "owns" this stack
128: */
129: private VectorTime localVector;
130:
131: /**
132: * dealy queue containg messages waiting for the delivery i.e causal order
133: */
134: private LinkedList delayQueue;
135:
136: /**
137: *Address of this group member
138: */
139: private Address localAddress;
140:
141: /**
142: *default constructor
143: */
144: public CAUSAL() {
145: }
146:
147: /**
148: * Adds a vectortimestamp to a sorted queue
149: * @param tvt A vector time stamp
150: */
151: private void addToDelayQueue(TransportedVectorTime tvt) {
152: ListIterator i = delayQueue.listIterator(0);
153: TransportedVectorTime current = null;
154: while (i.hasNext()) {
155: current = (TransportedVectorTime) i.next();
156: if (tvt.lessThanOrEqual(current)) {
157: delayQueue.add(i.previousIndex(), tvt);
158: return;
159: }
160: }
161: delayQueue.add(tvt);
162: }
163:
164: /**
165: * Processes Event going down in the stack
166: * @param evt Event passed from the stack above Causal
167: */
168: public void down(Event evt) {
169: switch (evt.getType()) {
170: case Event.MSG:
171: Message msg = (Message) evt.getArg();
172:
173: //dont stamp unicasts
174: if (msg.getDest() != null
175: && !msg.getDest().isMulticastAddress())
176: break;
177:
178: Message causalMsg = new Message(msg.getDest(),
179: msg.getSrc(), msg);
180: synchronized (this ) {
181: localVector.increment();
182: causalMsg.putHeader(getName(), new CausalHeader(
183: localVector.getTransportedVectorTime()));
184: }
185: passDown(new Event(Event.MSG, causalMsg));
186: return;
187: }
188: passDown(evt);
189: }
190:
191: /**
192: * Processes Event going up through the stack
193: * @param evt Event passed from the stack below Causal
194: */
195: public void up(Event evt) {
196: switch (evt.getType()) {
197: case Event.SET_LOCAL_ADDRESS:
198: localAddress = (Address) evt.getArg();
199: localVector = new VectorTime(localAddress);
200: delayQueue = new LinkedList();
201: break;
202:
203: case Event.VIEW_CHANGE:
204: Vector newViewMembers = ((View) evt.getArg()).getMembers();
205: localVector.merge((Vector) newViewMembers.clone());
206: localVector.reset();
207: break;
208:
209: case Event.MSG:
210: Object obj = null;
211: Message msg = (Message) evt.getArg();
212:
213: if (!((obj = msg.getHeader(getName())) instanceof CausalHeader)) {
214: if ((msg.getDest() == null || msg.getDest()
215: .isMulticastAddress())
216: && log.isErrorEnabled())
217: log.error("NO CAUSAL.Header found");
218: passUp(evt);
219: return;
220: }
221:
222: CausalHeader header = (CausalHeader) obj;
223: TransportedVectorTime messageVector = header
224: .getVectorTime();
225:
226: synchronized (this ) {
227: if (localVector.isCausallyNext(messageVector)) {
228: Message tmp = (Message) msg.getObject();
229: tmp.setSrc(msg.getSrc());
230: passUp(new Event(Event.MSG, tmp));
231: localVector.max(messageVector);
232: } else {
233: messageVector.setAssociatedMessage(msg);
234: addToDelayQueue(messageVector);
235: }
236: TransportedVectorTime queuedVector = null;
237: while ((delayQueue.size() > 0)
238: && localVector
239: .isCausallyNext((queuedVector = (TransportedVectorTime) delayQueue
240: .getFirst()))) {
241: delayQueue.remove(queuedVector);
242: Object tmp = queuedVector.getAssociatedMessage()
243: .getObject();
244: passUp(new Event(Event.MSG, tmp));
245: localVector.max(queuedVector);
246: }
247: return;
248: }
249:
250: }
251: passUp(evt);
252: }
253:
254: /**
255: * Returns a name of this stack, each stackhas to have unique name
256: * @return stack's name - CAUSAL
257: */
258: public String getName() {
259: return "CAUSAL";
260: }
261:
262: }
|