001: package org.jgroups.protocols;
002:
003: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
004: import org.jgroups.*;
005: import org.jgroups.stack.Protocol;
006: import org.jgroups.util.Streamable;
007: import org.jgroups.util.Util;
008:
009: import java.io.*;
010: import java.util.*;
011:
012: /**
013: * Implementation of total order protocol using a sequencer. Consult doc/design/SEQUENCER.txt for details
014: * @author Bela Ban
015: * @version $Id: SEQUENCER.java,v 1.11.2.1 2007/04/27 08:03:52 belaban Exp $
016: */
017: public class SEQUENCER extends Protocol {
018: private Address local_addr = null, coord = null;
019: static final String name = "SEQUENCER";
020: private boolean is_coord = false;
021: private long seqno = 0;
022:
023: /** Map<seqno, Message>: maintains messages forwarded to the coord which which no ack has been received yet */
024: private final Map forward_table = new TreeMap();
025:
026: /** Map<Address, seqno>: maintains the highest seqnos seen for a given member */
027: private final ConcurrentHashMap received_table = new ConcurrentHashMap();
028:
029: private long forwarded_msgs = 0;
030: private long bcast_msgs = 0;
031: private long received_forwards = 0;
032: private long received_bcasts = 0;
033:
034: public boolean isCoordinator() {
035: return is_coord;
036: }
037:
038: public Address getCoordinator() {
039: return coord;
040: }
041:
042: public Address getLocalAddress() {
043: return local_addr;
044: }
045:
046: public String getName() {
047: return name;
048: }
049:
050: public long getForwarded() {
051: return forwarded_msgs;
052: }
053:
054: public long getBroadcast() {
055: return bcast_msgs;
056: }
057:
058: public long getReceivedForwards() {
059: return received_forwards;
060: }
061:
062: public long getReceivedBroadcasts() {
063: return received_bcasts;
064: }
065:
066: public void resetStats() {
067: forwarded_msgs = bcast_msgs = received_forwards = received_bcasts = 0L;
068: }
069:
070: public Map dumpStats() {
071: Map m = super .dumpStats();
072: if (m == null)
073: m = new HashMap();
074: m.put("forwarded", new Long(forwarded_msgs));
075: m.put("broadcast", new Long(bcast_msgs));
076: m.put("received_forwards", new Long(received_forwards));
077: m.put("received_bcasts", new Long(received_bcasts));
078: return m;
079: }
080:
081: public String printStats() {
082: return dumpStats().toString();
083: }
084:
085: public boolean setProperties(Properties props) {
086: super .setProperties(props);
087:
088: if (props.size() > 0) {
089: log.error("the following properties are not recognized: "
090: + props);
091: return false;
092: }
093: return true;
094: }
095:
096: private final long nextSeqno() {
097: synchronized (this ) {
098: return seqno++;
099: }
100: }
101:
102: public void down(Event evt) {
103: switch (evt.getType()) {
104: case Event.MSG:
105: Message msg = (Message) evt.getArg();
106: Address dest = msg.getDest();
107: if (dest == null || dest.isMulticastAddress()) { // only handle multicasts
108: long next_seqno = nextSeqno();
109: SequencerHeader hdr = new SequencerHeader(
110: SequencerHeader.FORWARD, local_addr, next_seqno);
111: msg.putHeader(name, hdr);
112: if (!is_coord) {
113: forwardToCoord(msg, next_seqno);
114: } else {
115: broadcast(msg);
116: }
117: return; // don't pass down
118: }
119: break;
120:
121: case Event.VIEW_CHANGE:
122: handleViewChange((View) evt.getArg());
123: break;
124: }
125: passDown(evt);
126: }
127:
128: public void up(Event evt) {
129: Message msg;
130: SequencerHeader hdr;
131:
132: switch (evt.getType()) {
133:
134: case Event.SET_LOCAL_ADDRESS:
135: local_addr = (Address) evt.getArg();
136: break;
137:
138: case Event.MSG:
139: msg = (Message) evt.getArg();
140: hdr = (SequencerHeader) msg.getHeader(name);
141: if (hdr == null)
142: break; // pass up
143:
144: switch (hdr.type) {
145: case SequencerHeader.FORWARD:
146: if (!is_coord) {
147: if (log.isErrorEnabled())
148: log
149: .warn("I ("
150: + local_addr
151: + ") am not the coord and don't handle "
152: + "FORWARD requests, ignoring request");
153: return;
154: }
155: broadcast(msg);
156: received_forwards++;
157: return;
158: case SequencerHeader.BCAST:
159: deliver(msg, hdr); // deliver a copy and return (discard the original msg)
160: received_bcasts++;
161: return;
162: }
163: break;
164:
165: case Event.VIEW_CHANGE:
166: handleViewChange((View) evt.getArg());
167: break;
168: }
169:
170: passUp(evt);
171: }
172:
173: /* --------------------------------- Private Methods ----------------------------------- */
174:
175: private void handleViewChange(View v) {
176: Vector members = v.getMembers();
177: if (members.size() == 0)
178: return;
179:
180: Address prev_coord = coord;
181: coord = (Address) members.firstElement();
182: is_coord = local_addr != null && local_addr.equals(coord);
183:
184: boolean coord_changed = prev_coord != null
185: && !prev_coord.equals(coord);
186: if (coord_changed) {
187: resendMessagesInForwardTable(); // maybe optimize in the future: broadcast directly if coord
188: }
189: // remove left members from received_table
190: int size = received_table.size();
191: Set keys = received_table.keySet();
192: keys.retainAll(members);
193: if (keys.size() != size) {
194: if (log.isTraceEnabled())
195: log.trace("adjusted received_table, keys are " + keys);
196: }
197: }
198:
199: /**
200: * Sends all messages currently in forward_table to the new coordinator (changing the dest field).
201: * This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages
202: * to its retransmission mechanism<br/>
203: * Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message
204: * from being inserted until we're done, that's why there's synchronization.
205: */
206: private void resendMessagesInForwardTable() {
207: Message msg;
208: synchronized (forward_table) {
209: for (Iterator it = forward_table.values().iterator(); it
210: .hasNext();) {
211: msg = (Message) it.next();
212: msg.setDest(coord);
213: passDown(new Event(Event.MSG, msg));
214: }
215: }
216: }
217:
218: private void forwardToCoord(Message msg, long seqno) {
219: msg.setDest(coord); // we change the message dest from multicast to unicast (to coord)
220: synchronized (forward_table) {
221: forward_table.put(new Long(seqno), msg);
222: }
223: passDown(new Event(Event.MSG, msg));
224: forwarded_msgs++;
225: }
226:
227: private void broadcast(Message msg) {
228: SequencerHeader hdr = (SequencerHeader) msg.getHeader(name);
229: hdr.type = SequencerHeader.BCAST; // we change the type of header, but leave the tag intact
230: msg.setDest(null); // mcast
231: msg.setSrc(local_addr); // the coord is sending it - this will be replaced with sender in deliver()
232: passDown(new Event(Event.MSG, msg));
233: bcast_msgs++;
234: }
235:
236: /**
237: * We copy the message in order to change the sender's address. If we did this on the original message,
238: * retransmission would likely run into problems, and possibly also stability (STABLE) of messages
239: * @param msg
240: * @param hdr
241: */
242: private void deliver(Message msg, SequencerHeader hdr) {
243: Address original_sender = hdr.getOriginalSender();
244: if (original_sender == null) {
245: if (log.isErrorEnabled())
246: log
247: .error("original sender is null, cannot swap sender address back to original sender");
248: return;
249: }
250: long msg_seqno = hdr.getSeqno();
251:
252: // this is the ack for the message sent by myself
253: if (original_sender.equals(local_addr)) {
254: synchronized (forward_table) {
255: forward_table.remove(new Long(msg_seqno));
256: }
257: }
258:
259: // if msg was already delivered, discard it
260: Long highest_seqno_seen = (Long) received_table
261: .get(original_sender);
262: if (highest_seqno_seen != null) {
263: if (highest_seqno_seen.longValue() >= msg_seqno) {
264: if (log.isWarnEnabled())
265: log.warn("message seqno (" + original_sender + "::"
266: + msg_seqno + " has already "
267: + "been received (highest received="
268: + highest_seqno_seen
269: + "); discarding duplicate message");
270: return;
271: }
272: }
273: // update the table with the new seqno
274: received_table.put(original_sender, new Long(msg_seqno));
275:
276: // pass a copy of the message up the stack
277: Message tmp = msg.copy(true);
278: tmp.setSrc(original_sender);
279: passUp(new Event(Event.MSG, tmp));
280: }
281:
282: /* ----------------------------- End of Private Methods -------------------------------- */
283:
284: public static class SequencerHeader extends Header implements
285: Streamable {
286: static final byte FORWARD = 1;
287: static final byte BCAST = 2;
288:
289: byte type = -1;
290: /** the original sender's address and a seqno */
291: ViewId tag = null;
292:
293: public SequencerHeader() {
294: }
295:
296: public SequencerHeader(byte type, Address original_sender,
297: long seqno) {
298: this .type = type;
299: this .tag = new ViewId(original_sender, seqno);
300: }
301:
302: public Address getOriginalSender() {
303: return tag != null ? tag.getCoordAddress() : null;
304: }
305:
306: public long getSeqno() {
307: return tag != null ? tag.getId() : -1;
308: }
309:
310: public String toString() {
311: StringBuffer sb = new StringBuffer(64);
312: sb.append(printType());
313: if (tag != null)
314: sb.append(" (tag=").append(tag).append(")");
315: return sb.toString();
316: }
317:
318: private final String printType() {
319: switch (type) {
320: case FORWARD:
321: return "FORWARD";
322: case BCAST:
323: return "BCAST";
324: default:
325: return "n/a";
326: }
327: }
328:
329: public void writeExternal(ObjectOutput out) throws IOException {
330: out.writeByte(type);
331: out.writeObject(tag);
332: }
333:
334: public void readExternal(ObjectInput in) throws IOException,
335: ClassNotFoundException {
336: type = in.readByte();
337: tag = (ViewId) in.readObject();
338: }
339:
340: public void writeTo(DataOutputStream out) throws IOException {
341: out.writeByte(type);
342: Util.writeStreamable(tag, out);
343: }
344:
345: public void readFrom(DataInputStream in) throws IOException,
346: IllegalAccessException, InstantiationException {
347: type = in.readByte();
348: tag = (ViewId) Util.readStreamable(ViewId.class, in);
349: }
350:
351: public long size() {
352: long size = Global.BYTE_SIZE * 2; // type + presence byte
353: if (tag != null)
354: size += tag.serializedSize();
355: return size;
356: }
357:
358: }
359:
360: }
|