001: package org.jgroups.protocols;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.Protocol;
005: import org.jgroups.util.Streamable;
006: import org.jgroups.util.TimeScheduler;
007: import org.jgroups.util.Util;
008:
009: import java.io.*;
010: import java.util.Properties;
011: import java.util.Vector;
012:
013: /**
014: * Periodically sends the view to the group. When a view is received which is greater than the current view, we
015: * install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view
016: * dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
017: * @author Bela Ban
018: * @version $Id: VIEW_SYNC.java,v 1.10.2.1 2007/04/27 08:03:52 belaban Exp $
019: */
020: public class VIEW_SYNC extends Protocol {
021: Address local_addr = null;
022: final Vector mbrs = new Vector();
023: View my_view = null;
024: ViewId my_vid = null;
025:
026: /** Sends a VIEW_SYNC message to the group every 20 seconds on average. 0 disables sending of VIEW_SYNC messages */
027: long avg_send_interval = 60000;
028:
029: private int num_views_sent = 0;
030: private int num_views_adjusted = 0;
031:
032: private volatile ViewSendTask view_send_task = null; // bcasts periodic STABLE message (added to timer below)
033: final Object view_send_task_mutex = new Object(); // to sync on stable_task
034: TimeScheduler timer = null; // to send periodic STABLE msgs (and STABILITY messages)
035: static final String name = "VIEW_SYNC";
036:
037: public String getName() {
038: return name;
039: }
040:
041: public long getAverageSendInterval() {
042: return avg_send_interval;
043: }
044:
045: public void setAverageSendInterval(long gossip_interval) {
046: avg_send_interval = gossip_interval;
047: }
048:
049: public int getNumViewsSent() {
050: return num_views_sent;
051: }
052:
053: public int getNumViewsAdjusted() {
054: return num_views_adjusted;
055: }
056:
057: public void resetStats() {
058: super .resetStats();
059: num_views_adjusted = num_views_sent = 0;
060: }
061:
062: public boolean setProperties(Properties props) {
063: String str;
064:
065: super .setProperties(props);
066:
067: str = props.getProperty("avg_send_interval");
068: if (str != null) {
069: avg_send_interval = Long.parseLong(str);
070: props.remove("avg_send_interval");
071: }
072:
073: if (props.size() > 0) {
074: log.error("these properties are not recognized: " + props);
075: return false;
076: }
077: return true;
078: }
079:
080: public void start() throws Exception {
081: if (stack != null && stack.timer != null)
082: timer = stack.timer;
083: else
084: throw new Exception(
085: "timer cannot be retrieved from protocol stack");
086: }
087:
088: public void stop() {
089: stopViewSender();
090: }
091:
092: /** Sends a VIEW_SYNC_REQ to all members, every member replies with a VIEW multicast */
093: public void sendViewRequest() {
094: Message msg = new Message(null);
095: ViewSyncHeader hdr = new ViewSyncHeader(
096: ViewSyncHeader.VIEW_SYNC_REQ, null);
097: msg.putHeader(name, hdr);
098: passDown(new Event(Event.MSG, msg));
099: }
100:
101: // public void sendFakeViewForTestingOnly() {
102: // ViewId fake_vid=new ViewId(local_addr, my_vid.getId() +2);
103: // View fake_view=new View(fake_vid, new Vector(my_view.getMembers()));
104: // System.out.println("sending fake view " + fake_view);
105: // my_view=fake_view;
106: // my_vid=fake_vid;
107: // sendView();
108: // }
109:
110: public void up(Event evt) {
111: Message msg;
112: ViewSyncHeader hdr;
113: int type = evt.getType();
114:
115: switch (type) {
116:
117: case Event.MSG:
118: msg = (Message) evt.getArg();
119: hdr = (ViewSyncHeader) msg.removeHeader(name);
120: if (hdr == null)
121: break;
122: Address sender = msg.getSrc();
123: switch (hdr.type) {
124: case ViewSyncHeader.VIEW_SYNC:
125: handleView(hdr.view, sender);
126: break;
127: case ViewSyncHeader.VIEW_SYNC_REQ:
128: if (!sender.equals(local_addr))
129: sendView();
130: break;
131: default:
132: if (log.isErrorEnabled())
133: log.error("ViewSyncHeader type " + hdr.type
134: + " not known");
135: }
136: return;
137:
138: case Event.VIEW_CHANGE:
139: View view = (View) evt.getArg();
140: handleViewChange(view);
141: break;
142:
143: case Event.SET_LOCAL_ADDRESS:
144: local_addr = (Address) evt.getArg();
145: break;
146: }
147: passUp(evt);
148: }
149:
150: public void down(Event evt) {
151: switch (evt.getType()) {
152: case Event.VIEW_CHANGE:
153: View v = (View) evt.getArg();
154: handleViewChange(v);
155: break;
156: }
157: passDown(evt);
158: }
159:
160: /* --------------------------------------- Private Methods ---------------------------------------- */
161:
162: private void handleView(View v, Address sender) {
163: Vector members = v.getMembers();
164: if (!members.contains(local_addr)) {
165: if (log.isWarnEnabled())
166: log.warn("discarding view as I (" + local_addr
167: + ") am not member of view (" + v + ")");
168: return;
169: }
170:
171: ViewId vid = v.getVid();
172: int rc = vid.compareTo(my_vid);
173: if (rc > 0) { // foreign view is greater than my own view; update my own view !
174: if (log.isTraceEnabled())
175: log.trace("view from " + sender + " (" + vid
176: + ") is greater than my own view (" + my_vid
177: + ");" + " will update my own view");
178:
179: Message view_change = new Message(local_addr, local_addr,
180: null);
181: org.jgroups.protocols.pbcast.GMS.GmsHeader hdr;
182: hdr = new org.jgroups.protocols.pbcast.GMS.GmsHeader(
183: org.jgroups.protocols.pbcast.GMS.GmsHeader.VIEW, v);
184: view_change.putHeader(GMS.name, hdr);
185: passUp(new Event(Event.MSG, view_change));
186: num_views_adjusted++;
187: }
188: }
189:
190: private void handleViewChange(View view) {
191: Vector tmp = view.getMembers();
192: if (tmp != null) {
193: mbrs.clear();
194: mbrs.addAll(tmp);
195: }
196: my_view = (View) view.clone();
197: my_vid = my_view.getVid();
198: if (my_view.size() > 1
199: && (view_send_task == null || !view_send_task.running()))
200: startViewSender();
201: }
202:
203: private void sendView() {
204: View tmp = (View) (my_view != null ? my_view.clone() : null);
205: if (tmp == null)
206: return;
207: Message msg = new Message(null); // send to the group
208: ViewSyncHeader hdr = new ViewSyncHeader(
209: ViewSyncHeader.VIEW_SYNC, tmp);
210: msg.putHeader(name, hdr);
211: passDown(new Event(Event.MSG, msg));
212: num_views_sent++;
213: }
214:
215: void startViewSender() {
216: // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
217: // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
218: // 1 cycle: on the next message or view, we will start the task
219: if (view_send_task != null)
220: return;
221: synchronized (view_send_task_mutex) {
222: if (view_send_task != null && view_send_task.running()) {
223: return; // already running
224: }
225: view_send_task = new ViewSendTask();
226: timer.add(view_send_task, true); // fixed-rate scheduling
227: }
228: if (log.isTraceEnabled())
229: log.trace("view send task started");
230: }
231:
232: void stopViewSender() {
233: // contrary to startViewSender(), we don't need double-checked locking here because this method is not
234: // called frequently
235: synchronized (view_send_task_mutex) {
236: if (view_send_task != null) {
237: view_send_task.stop();
238: if (log.isTraceEnabled())
239: log.trace("view send task stopped");
240: view_send_task = null;
241: }
242: }
243: }
244:
245: /* ------------------------------------End of Private Methods ------------------------------------- */
246:
247: public static class ViewSyncHeader extends Header implements
248: Streamable {
249: public static final int VIEW_SYNC = 1; // contains a view
250: public static final int VIEW_SYNC_REQ = 2; // request to all members to send their views
251:
252: int type = 0;
253: View view = null;
254:
255: public ViewSyncHeader() {
256: }
257:
258: public ViewSyncHeader(int type, View view) {
259: this .type = type;
260: this .view = view;
261: }
262:
263: public int getType() {
264: return type;
265: }
266:
267: public View getView() {
268: return view;
269: }
270:
271: static String type2String(int t) {
272: switch (t) {
273: case VIEW_SYNC:
274: return "VIEW_SYNC";
275: case VIEW_SYNC_REQ:
276: return "VIEW_SYNC_REQ";
277: default:
278: return "<unknown>";
279: }
280: }
281:
282: public String toString() {
283: StringBuffer sb = new StringBuffer();
284: sb.append('[');
285: sb.append(type2String(type));
286: sb.append("]");
287: if (view != null)
288: sb.append(", view= ").append(view);
289: return sb.toString();
290: }
291:
292: public void writeExternal(ObjectOutput out) throws IOException {
293: out.writeInt(type);
294: if (view == null) {
295: out.writeBoolean(false);
296: return;
297: }
298: out.writeBoolean(true);
299: view.writeExternal(out);
300: }
301:
302: public void readExternal(ObjectInput in) throws IOException,
303: ClassNotFoundException {
304: type = in.readInt();
305: boolean available = in.readBoolean();
306: if (available) {
307: view = new View();
308: view.readExternal(in);
309: }
310: }
311:
312: public long size() {
313: long retval = Global.INT_SIZE + Global.BYTE_SIZE
314: + Global.BYTE_SIZE; // type + view type + presence for digest
315: if (view != null)
316: retval += view.serializedSize();
317: return retval;
318: }
319:
320: public void writeTo(DataOutputStream out) throws IOException {
321: out.writeInt(type);
322: // 0 == null, 1 == View, 2 == MergeView
323: byte b = (byte) (view == null ? 0
324: : (view instanceof MergeView ? 2 : 1));
325: out.writeByte(b);
326: Util.writeStreamable(view, out);
327: }
328:
329: public void readFrom(DataInputStream in) throws IOException,
330: IllegalAccessException, InstantiationException {
331: type = in.readInt();
332: byte b = in.readByte();
333: Class clazz = b == 2 ? MergeView.class : View.class;
334: view = (View) Util.readStreamable(clazz, in);
335: }
336:
337: }
338:
339: /**
340: Periodically multicasts a View_SYNC message
341: */
342: private class ViewSendTask implements TimeScheduler.Task {
343: boolean stopped = false;
344:
345: public void stop() {
346: stopped = true;
347: }
348:
349: public boolean running() { // syntactic sugar
350: return !stopped;
351: }
352:
353: public boolean cancelled() {
354: return stopped;
355: }
356:
357: public long nextInterval() {
358: long interval = computeSleepTime();
359: if (interval <= 0)
360: return 10000;
361: else
362: return interval;
363: }
364:
365: public void run() {
366: sendView();
367: }
368:
369: long computeSleepTime() {
370: int num_mbrs = Math.max(mbrs.size(), 1);
371: return getRandom((num_mbrs * avg_send_interval * 2));
372: }
373:
374: long getRandom(long range) {
375: return (long) ((Math.random() * range) % range);
376: }
377: }
378:
379: }
|