001: package org.jgroups.protocols.pbcast;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.Protocol;
005: import org.jgroups.stack.StateTransferInfo;
006: import org.jgroups.util.Promise;
007: import org.jgroups.util.Streamable;
008: import org.jgroups.util.Util;
009:
010: import java.io.*;
011: import java.util.*;
012:
013: /**
014: * New STATE_TRANSFER protocol based on PBCAST. Compared to the one in ./protocols, it doesn't
015: * need a QUEUE layer above it. A state request is sent to a chosen member (coordinator if
016: * null). That member makes a copy D of its current digest and asks the application for a copy of
017: * its current state S. Then the member returns both S and D to the requester. The requester
018: * first sets its digest to D and then returns the state to the application.
019: * @author Bela Ban
020: * @version $Id: STATE_TRANSFER.java,v 1.44.2.3 2007/04/27 08:03:55 belaban Exp $
021: */
022: public class STATE_TRANSFER extends Protocol {
023: Address local_addr = null;
024: final Vector members = new Vector();
025: long state_id = 1; // used to differentiate between state transfers (not currently used)
026:
027: // final Set state_requesters=new HashSet(); // requesters of state (usually just 1, could be more)
028:
029: /** Map<String,Set> of state requesters. Keys are state IDs, values are Sets of Addresses (one for each requester) */
030: final Map state_requesters = new HashMap();
031:
032: /** set to true while waiting for a STATE_RSP */
033: boolean waiting_for_state_response = false;
034:
035: Digest digest = null;
036: final HashMap map = new HashMap(); // to store configuration information
037: long start, stop; // to measure state transfer time
038: int num_state_reqs = 0;
039: long num_bytes_sent = 0;
040: double avg_state_size = 0;
041: final static String name = "STATE_TRANSFER";
042: boolean use_flush = false;
043: long flush_timeout = 4000;
044: Promise flush_promise;
045: boolean flushProtocolInStack = false;
046:
047: /** All protocol names have to be unique ! */
048: public String getName() {
049: return name;
050: }
051:
052: public int getNumberOfStateRequests() {
053: return num_state_reqs;
054: }
055:
056: public long getNumberOfStateBytesSent() {
057: return num_bytes_sent;
058: }
059:
060: public double getAverageStateSize() {
061: return avg_state_size;
062: }
063:
064: public Vector requiredDownServices() {
065: Vector retval = new Vector();
066: retval.addElement(new Integer(Event.GET_DIGEST_STATE));
067: retval.addElement(new Integer(Event.SET_DIGEST));
068: return retval;
069: }
070:
071: public void resetStats() {
072: super .resetStats();
073: num_state_reqs = 0;
074: num_bytes_sent = 0;
075: avg_state_size = 0;
076: }
077:
078: public boolean setProperties(Properties props) {
079: super .setProperties(props);
080:
081: use_flush = Util.parseBoolean(props, "use_flush", false);
082: flush_promise = new Promise();
083:
084: flush_timeout = Util.parseLong(props, "flush_timeout",
085: flush_timeout);
086: if (props.size() > 0) {
087: log.error("the following properties are not recognized: "
088: + props);
089: return false;
090: }
091: return true;
092: }
093:
094: public void init() throws Exception {
095: map.put("state_transfer", Boolean.TRUE);
096: map.put("protocol_class", getClass().getName());
097: }
098:
099: public void start() throws Exception {
100: passUp(new Event(Event.CONFIG, map));
101: if (!flushProtocolInStack && use_flush) {
102: log
103: .warn("use_flush property is true, however, FLUSH protocol not found in stack");
104: use_flush = false;
105: }
106: }
107:
108: public void stop() {
109: super .stop();
110: waiting_for_state_response = false;
111: }
112:
113: public void up(Event evt) {
114: Message msg;
115: StateHeader hdr;
116:
117: switch (evt.getType()) {
118:
119: case Event.BECOME_SERVER:
120: break;
121:
122: case Event.SET_LOCAL_ADDRESS:
123: local_addr = (Address) evt.getArg();
124: break;
125:
126: case Event.TMP_VIEW:
127: case Event.VIEW_CHANGE:
128: handleViewChange((View) evt.getArg());
129: break;
130:
131: case Event.GET_DIGEST_STATE_OK:
132: synchronized (state_requesters) {
133: digest = (Digest) evt.getArg();
134: if (log.isDebugEnabled())
135: log.debug("GET_DIGEST_STATE_OK: digest is "
136: + digest + "\npassUp(GET_APPLSTATE)");
137:
138: requestApplicationStates();
139: }
140: return;
141:
142: case Event.MSG:
143: msg = (Message) evt.getArg();
144: if (!(msg.getHeader(name) instanceof StateHeader))
145: break;
146:
147: hdr = (StateHeader) msg.removeHeader(name);
148: switch (hdr.type) {
149: case StateHeader.STATE_REQ:
150: handleStateReq(hdr);
151: break;
152: case StateHeader.STATE_RSP:
153: handleStateRsp(hdr, msg.getBuffer());
154: if (use_flush) {
155: stopFlush();
156: }
157: break;
158: default:
159: if (log.isErrorEnabled())
160: log.error("type " + hdr.type
161: + " not known in StateHeader");
162: break;
163: }
164: return;
165: }
166: passUp(evt);
167: }
168:
169: public void down(Event evt) {
170: byte[] state;
171: Address target, requester;
172: StateTransferInfo info;
173: StateHeader hdr;
174:
175: switch (evt.getType()) {
176:
177: case Event.TMP_VIEW:
178: case Event.VIEW_CHANGE:
179: handleViewChange((View) evt.getArg());
180: break;
181:
182: // generated by JChannel.getState(). currently, getting the state from more than 1 mbr is not implemented
183: case Event.GET_STATE:
184: info = (StateTransferInfo) evt.getArg();
185: if (info.target == null) {
186: target = determineCoordinator();
187: } else {
188: target = info.target;
189: if (target.equals(local_addr)) {
190: if (log.isErrorEnabled())
191: log
192: .error("GET_STATE: cannot fetch state from myself !");
193: target = null;
194: }
195: }
196: if (target == null) {
197: if (log.isDebugEnabled())
198: log.debug("GET_STATE: first member (no state)");
199: passUp(new Event(Event.GET_STATE_OK,
200: new StateTransferInfo()));
201: } else {
202: boolean successfulFlush = false;
203: if (use_flush) {
204: successfulFlush = startFlush(flush_timeout, 5);
205: }
206: if (successfulFlush) {
207: log.info("Successful flush at " + local_addr);
208: }
209: Message state_req = new Message(target, null, null);
210: state_req.putHeader(name, new StateHeader(
211: StateHeader.STATE_REQ, local_addr, state_id++,
212: null, info.state_id));
213: if (log.isDebugEnabled())
214: log.debug("GET_STATE: asking " + target
215: + " for state");
216:
217: // suspend sending and handling of mesage garbage collection gossip messages,
218: // fixes bugs #943480 and #938584). Wake up when state has been received
219: if (log.isDebugEnabled())
220: log.debug("passing down a SUSPEND_STABLE event");
221: passDown(new Event(Event.SUSPEND_STABLE, new Long(
222: info.timeout)));
223: waiting_for_state_response = true;
224: start = System.currentTimeMillis();
225: passDown(new Event(Event.MSG, state_req));
226: }
227: return; // don't pass down any further !
228:
229: case Event.GET_APPLSTATE_OK:
230: info = (StateTransferInfo) evt.getArg();
231: state = info.state;
232: String id = info.state_id;
233: synchronized (state_requesters) {
234: if (state_requesters.size() == 0) {
235: if (log.isWarnEnabled())
236: log
237: .warn("GET_APPLSTATE_OK: received application state, but there are no requesters !");
238: return;
239: }
240: if (isDigestNeeded()) {
241: if (digest == null) {
242: if (log.isWarnEnabled())
243: log
244: .warn("GET_APPLSTATE_OK: received application state, but there is no digest !");
245: } else {
246: digest = digest.copy();
247: }
248: }
249: if (stats) {
250: num_state_reqs++;
251: if (state != null)
252: num_bytes_sent += state.length;
253: avg_state_size = num_bytes_sent / num_state_reqs;
254: }
255:
256: Set requesters = (Set) state_requesters.get(id);
257: if (requesters == null || requesters.size() == 0) {
258: log
259: .warn("received state for id="
260: + id
261: + ", but there are no requesters for this ID");
262: } else {
263: for (Iterator it = requesters.iterator(); it
264: .hasNext();) {
265: requester = (Address) it.next();
266: final Message state_rsp = new Message(
267: requester, null, state);
268: hdr = new StateHeader(StateHeader.STATE_RSP,
269: local_addr, 0, digest, id);
270: state_rsp.putHeader(name, hdr);
271: if (log.isTraceEnabled())
272: log.trace("sending state for ID=" + id
273: + " to " + requester + " ("
274: + state.length + " bytes)");
275:
276: // This has to be done in a separate thread, so we don't block on FC
277: // (see http://jira.jboss.com/jira/browse/JGRP-225 for details). This will be reverted once
278: // we have the threadless stack (http://jira.jboss.com/jira/browse/JGRP-181)
279: // and out-of-band messages (http://jira.jboss.com/jira/browse/JGRP-205)
280: new Thread() {
281: public void run() {
282: passDown(new Event(Event.MSG, state_rsp));
283: }
284: }.start();
285: // passDown(new Event(Event.MSG, state_rsp));
286: }
287: state_requesters.remove(id);
288: }
289: }
290: return; // don't pass down any further !
291: case Event.SUSPEND_OK:
292: if (use_flush) {
293: flush_promise.setResult(Boolean.TRUE);
294: }
295: break;
296: case Event.SUSPEND_FAILED:
297: if (use_flush) {
298: flush_promise.setResult(Boolean.FALSE);
299: }
300: break;
301:
302: case Event.CONFIG:
303: Map config = (Map) evt.getArg();
304: if (config != null && config.containsKey("flush_timeout")) {
305: Long ftimeout = (Long) config.get("flush_timeout");
306: use_flush = true;
307: flush_timeout = ftimeout.longValue();
308: }
309: if ((config != null && !config
310: .containsKey("flush_suported"))) {
311: flushProtocolInStack = true;
312: }
313: break;
314:
315: }
316:
317: passDown(evt); // pass on to the layer below us
318: }
319:
320: /* --------------------------- Private Methods -------------------------------- */
321:
322: /**
323: * When FLUSH is used we do not need to pass digests between members
324: *
325: * see JGroups/doc/design/PartialStateTransfer.txt
326: * see JGroups/doc/design/FLUSH.txt
327: *
328: * @return true if use of digests is required, false otherwise
329: */
330: private boolean isDigestNeeded() {
331: return !use_flush;
332: }
333:
334: private void requestApplicationStates() {
335: synchronized (state_requesters) {
336: Set appl_ids = new HashSet(state_requesters.keySet());
337: String id;
338: for (Iterator it = appl_ids.iterator(); it.hasNext();) {
339: id = (String) it.next();
340: StateTransferInfo info = new StateTransferInfo(null,
341: id, 0L, null);
342: passUp(new Event(Event.GET_APPLSTATE, info));
343: }
344: }
345: }
346:
347: /** Return the first element of members which is not me. Otherwise return null. */
348: private Address determineCoordinator() {
349: Address ret = null;
350: synchronized (members) {
351: if (members != null && members.size() > 1) {
352: for (int i = 0; i < members.size(); i++)
353: if (!local_addr.equals(members.elementAt(i)))
354: return (Address) members.elementAt(i);
355: }
356: }
357: return ret;
358: }
359:
360: private void handleViewChange(View v) {
361: Address old_coord;
362: Vector new_members = v.getMembers();
363: boolean send_up_null_state_rsp = false;
364:
365: synchronized (members) {
366: old_coord = (Address) (members.size() > 0 ? members
367: .firstElement() : null);
368: members.clear();
369: members.addAll(new_members);
370:
371: // this handles the case where a coord dies during a state transfer; prevents clients from hanging forever
372: // Note this only takes a coordinator crash into account, a getState(target, timeout), where target is not
373: // null is not handled ! (Usually we get the state from the coordinator)
374: // http://jira.jboss.com/jira/browse/JGRP-148
375: if (waiting_for_state_response && old_coord != null
376: && !members.contains(old_coord)) {
377: send_up_null_state_rsp = true;
378: }
379: }
380:
381: if (send_up_null_state_rsp) {
382: if (log.isWarnEnabled())
383: log
384: .warn("discovered that the state provider ("
385: + old_coord
386: + ") crashed; will return null state to application");
387: StateHeader hdr = new StateHeader(StateHeader.STATE_RSP,
388: local_addr, 0, null, null);
389: handleStateRsp(hdr, null); // sends up null GET_STATE_OK
390: }
391: }
392:
393: /**
394: * If a state transfer is in progress, we don't need to send a GET_APPLSTATE event to the application, but
395: * instead we just add the sender to the requester list so it will receive the same state when done. If not,
396: * we add the sender to the requester list and send a GET_APPLSTATE event up.
397: */
398: private void handleStateReq(StateHeader hdr) {
399: Object sender = hdr.sender;
400: if (sender == null) {
401: if (log.isErrorEnabled())
402: log.error("sender is null !");
403: return;
404: }
405:
406: String id = hdr.state_id; // id could be null, which means get the entire state
407: synchronized (state_requesters) {
408: boolean empty = state_requesters.size() == 0;
409: Set requesters = (Set) state_requesters.get(id);
410: if (requesters == null) {
411: requesters = new HashSet();
412: state_requesters.put(id, requesters);
413: }
414: requesters.add(sender);
415:
416: if (!isDigestNeeded()) { // state transfer is in progress, digest was already requested
417: requestApplicationStates();
418: } else if (empty) {
419: digest = null;
420: if (log.isDebugEnabled())
421: log.debug("passing down GET_DIGEST_STATE");
422: passDown(new Event(Event.GET_DIGEST_STATE));
423: }
424: }
425: }
426:
427: /** Set the digest and the send the state up to the application */
428: void handleStateRsp(StateHeader hdr, byte[] state) {
429: Address sender = hdr.sender;
430: Digest tmp_digest = hdr.my_digest;
431: String id = hdr.state_id;
432:
433: waiting_for_state_response = false;
434: if (isDigestNeeded()) {
435: if (tmp_digest == null) {
436: if (log.isWarnEnabled())
437: log.warn("digest received from " + sender
438: + " is null, skipping setting digest !");
439: } else
440: passDown(new Event(Event.SET_DIGEST, tmp_digest)); // set the digest (e.g. in NAKACK)
441: }
442: stop = System.currentTimeMillis();
443:
444: // resume sending and handling of message garbage collection gossip messages,
445: // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage
446: // collection protocol (e.g. STABLE)
447: if (log.isDebugEnabled())
448: log.debug("passing down a RESUME_STABLE event");
449: passDown(new Event(Event.RESUME_STABLE));
450:
451: if (state == null) {
452: if (log.isWarnEnabled())
453: log
454: .warn("state received from "
455: + sender
456: + " is null, will return null state to application");
457: } else
458: log.debug("received state, size=" + state.length
459: + " bytes. Time=" + (stop - start)
460: + " milliseconds");
461: StateTransferInfo info = new StateTransferInfo(null, id, 0L,
462: state);
463: passUp(new Event(Event.GET_STATE_OK, info));
464: }
465:
466: private boolean startFlush(long timeout, int numberOfAttempts) {
467: boolean successfulFlush = false;
468: flush_promise.reset();
469: passUp(new Event(Event.SUSPEND));
470: try {
471: Boolean r = (Boolean) flush_promise
472: .getResultWithTimeout(timeout);
473: successfulFlush = r.booleanValue();
474: } catch (TimeoutException e) {
475: log.warn("Initiator of flush and state requesting member "
476: + local_addr
477: + " timed out waiting for flush responses after "
478: + flush_timeout + " msec");
479: }
480:
481: if (!successfulFlush && numberOfAttempts > 0) {
482: long backOffSleepTime = Util.random(5000);
483: if (log.isInfoEnabled())
484: log.info("Flush in progress detected at " + local_addr
485: + ". Backing off for " + backOffSleepTime
486: + " ms. Attempts left " + numberOfAttempts);
487:
488: Util.sleepRandom(backOffSleepTime);
489: successfulFlush = startFlush(flush_timeout,
490: --numberOfAttempts);
491: }
492: return successfulFlush;
493: }
494:
495: private void stopFlush() {
496: passUp(new Event(Event.RESUME));
497: }
498:
499: /* ------------------------ End of Private Methods ------------------------------ */
500:
501: /**
502: * Wraps data for a state request/response. Note that for a state response the actual state will <em>not</em
503: * be stored in the header itself, but in the message's buffer.
504: *
505: */
506: public static class StateHeader extends Header implements
507: Streamable {
508: public static final byte STATE_REQ = 1;
509: public static final byte STATE_RSP = 2;
510:
511: long id = 0; // state transfer ID (to separate multiple state transfers at the same time)
512: byte type = 0;
513: Address sender; // sender of state STATE_REQ or STATE_RSP
514: Digest my_digest = null; // digest of sender (if type is STATE_RSP)
515: String state_id = null; // for partial state transfer
516:
517: public StateHeader() { // for externalization
518: }
519:
520: public StateHeader(byte type, Address sender, long id,
521: Digest digest) {
522: this .type = type;
523: this .sender = sender;
524: this .id = id;
525: this .my_digest = digest;
526: }
527:
528: public StateHeader(byte type, Address sender, long id,
529: Digest digest, String state_id) {
530: this .type = type;
531: this .sender = sender;
532: this .id = id;
533: this .my_digest = digest;
534: this .state_id = state_id;
535: }
536:
537: public int getType() {
538: return type;
539: }
540:
541: public Digest getDigest() {
542: return my_digest;
543: }
544:
545: public String getStateId() {
546: return state_id;
547: }
548:
549: public boolean equals(Object o) {
550: StateHeader other;
551:
552: if (sender != null && o != null) {
553: if (!(o instanceof StateHeader))
554: return false;
555: other = (StateHeader) o;
556: return sender.equals(other.sender) && id == other.id;
557: }
558: return false;
559: }
560:
561: public int hashCode() {
562: if (sender != null)
563: return sender.hashCode() + (int) id;
564: else
565: return (int) id;
566: }
567:
568: public String toString() {
569: StringBuffer sb = new StringBuffer();
570: sb.append("type=").append(type2Str(type));
571: if (sender != null)
572: sb.append(", sender=").append(sender).append(" id=")
573: .append(id);
574: if (my_digest != null)
575: sb.append(", digest=").append(my_digest);
576: if (state_id != null)
577: sb.append(", state_id=").append(state_id);
578: return sb.toString();
579: }
580:
581: static String type2Str(int t) {
582: switch (t) {
583: case STATE_REQ:
584: return "STATE_REQ";
585: case STATE_RSP:
586: return "STATE_RSP";
587: default:
588: return "<unknown>";
589: }
590: }
591:
592: public void writeExternal(ObjectOutput out) throws IOException {
593: out.writeObject(sender);
594: out.writeLong(id);
595: out.writeByte(type);
596: out.writeObject(my_digest);
597: if (state_id == null) {
598: out.writeBoolean(false);
599: } else {
600: out.writeBoolean(true);
601: out.writeUTF(state_id);
602: }
603: }
604:
605: public void readExternal(ObjectInput in) throws IOException,
606: ClassNotFoundException {
607: sender = (Address) in.readObject();
608: id = in.readLong();
609: type = in.readByte();
610: my_digest = (Digest) in.readObject();
611: if (in.readBoolean())
612: state_id = in.readUTF();
613: }
614:
615: public void writeTo(DataOutputStream out) throws IOException {
616: out.writeByte(type);
617: out.writeLong(id);
618: Util.writeAddress(sender, out);
619: Util.writeStreamable(my_digest, out);
620: Util.writeString(state_id, out);
621: }
622:
623: public void readFrom(DataInputStream in) throws IOException,
624: IllegalAccessException, InstantiationException {
625: type = in.readByte();
626: id = in.readLong();
627: sender = Util.readAddress(in);
628: my_digest = (Digest) Util.readStreamable(Digest.class, in);
629: state_id = Util.readString(in);
630: }
631:
632: public long size() {
633: long retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type
634:
635: retval += Util.size(sender);
636:
637: retval += Global.BYTE_SIZE; // presence byte for my_digest
638: if (my_digest != null)
639: retval += my_digest.serializedSize();
640:
641: retval += Global.BYTE_SIZE; // presence byte for state_id
642: if (state_id != null)
643: retval += state_id.length() + 2;
644: return retval;
645: }
646:
647: }
648:
649: }
|